diff --git a/mkdocs/docs/concepts/snippets/manage-fleets.ext b/mkdocs/docs/concepts/snippets/manage-fleets.ext index b30b4126a8..5680e497d7 100644 --- a/mkdocs/docs/concepts/snippets/manage-fleets.ext +++ b/mkdocs/docs/concepts/snippets/manage-fleets.ext @@ -1,3 +1,34 @@ +### Instances + +Use `instances` to restrict a run to particular existing fleet instances. You can specify +instance names, hostnames or IP addresses, or a fleet name with an instance number. +For the `fleet` form, use `/` when the fleet belongs to another project: + +
+ +```yaml +instances: + - name: my-fleet-3 + - hostname: 203.0.113.10 + - fleet: my-fleet + instance: 3 +``` + +
+ +??? info "Short syntax" + + To match by instance name, you can use the string shorthand: + + ```yaml + instances: + - my-fleet-3 + ``` + +When `instances` is set, the run is only placed on a matching existing instance and `dstack` +never provisions new instances. If no matching instance is available, the run fails with a +no-capacity error (use `retry` to wait for a targeted instance to free up). + ### Idle duration If the run is submitted to a fleet with `nodes` set to a range and a new instance is provisioned, diff --git a/mkdocs/docs/guides/protips.md b/mkdocs/docs/guides/protips.md index 065556ef11..51b20d4f4b 100644 --- a/mkdocs/docs/guides/protips.md +++ b/mkdocs/docs/guides/protips.md @@ -208,6 +208,42 @@ $ dstack apply -R -f examples/.dstack.yml Or, set [`creation_policy`](../reference/dstack.yml/dev-environment.md#creation_policy) to `reuse` in the run configuration. +### Instances + +Use `instances` to restrict a run to particular existing fleet instances. You can specify +instance names, hostnames or IP addresses, or a fleet name with an instance number. +For the `fleet` form, use `/` when the fleet belongs to another project: + +
+ +```yaml +type: dev-environment +name: vscode +ide: vscode + +instances: + - name: my-fleet-3 + - hostname: 203.0.113.10 + - fleet: my-fleet + instance: 3 +``` + +
+ +??? info "Short syntax" + + To match by instance name, you can use the string shorthand: + + ```yaml + instances: + - my-fleet-3 + ``` + +When `instances` is set, the run is only placed on a matching existing instance and +`dstack` never provisions new instances. If no matching instance is available, the run +fails with a no-capacity error (use [`retry`](../reference/dstack.yml/dev-environment.md#retry) +to wait for a targeted instance to free up). + ### Idle duration If the run is submitted to a fleet with `nodes` set to a range and a new instance is provisioned, the shorter of the fleet's and run's `idle_duration` is used. diff --git a/mkdocs/docs/reference/dstack.yml/dev-environment.md b/mkdocs/docs/reference/dstack.yml/dev-environment.md index 594b6e65d7..ca1ec03c00 100644 --- a/mkdocs/docs/reference/dstack.yml/dev-environment.md +++ b/mkdocs/docs/reference/dstack.yml/dev-environment.md @@ -34,6 +34,35 @@ The `dev-environment` configuration type allows running [dev environments](../.. type: required: true +### `instances[n]` { #_instances data-toc-label="instances" } + +=== "Instance name" + + #SCHEMA# dstack._internal.core.models.profiles.InstanceNameSelector + overrides: + show_root_heading: false + +=== "Hostname" + + #SCHEMA# dstack._internal.core.models.profiles.InstanceHostnameSelector + overrides: + show_root_heading: false + +=== "Fleet instance" + + #SCHEMA# dstack._internal.core.models.profiles.FleetInstanceSelector + overrides: + show_root_heading: false + +??? info "Short syntax" + + The short syntax for `instances` is a list of instance names: + + ```yaml + instances: + - my-fleet-0 + ``` + ### `resources` #SCHEMA# dstack._internal.core.models.resources.ResourcesSpec diff --git a/mkdocs/docs/reference/dstack.yml/service.md b/mkdocs/docs/reference/dstack.yml/service.md index 5ddfe46dd1..c93e5b27a3 100644 --- a/mkdocs/docs/reference/dstack.yml/service.md +++ b/mkdocs/docs/reference/dstack.yml/service.md @@ -114,6 +114,35 @@ The `service` configuration type allows running [services](../../concepts/servic type: required: true +### `instances[n]` { #_instances data-toc-label="instances" } + +=== "Instance name" + + #SCHEMA# dstack._internal.core.models.profiles.InstanceNameSelector + overrides: + show_root_heading: false + +=== "Hostname" + + #SCHEMA# dstack._internal.core.models.profiles.InstanceHostnameSelector + overrides: + show_root_heading: false + +=== "Fleet instance" + + #SCHEMA# dstack._internal.core.models.profiles.FleetInstanceSelector + overrides: + show_root_heading: false + +??? info "Short syntax" + + The short syntax for `instances` is a list of instance names: + + ```yaml + instances: + - my-fleet-0 + ``` + ### `resources` #SCHEMA# dstack._internal.core.models.resources.ResourcesSpec diff --git a/mkdocs/docs/reference/dstack.yml/task.md b/mkdocs/docs/reference/dstack.yml/task.md index 96d05c325d..8091e985d1 100644 --- a/mkdocs/docs/reference/dstack.yml/task.md +++ b/mkdocs/docs/reference/dstack.yml/task.md @@ -34,6 +34,35 @@ The `task` configuration type allows running [tasks](../../concepts/tasks.md). type: required: true +### `instances[n]` { #_instances data-toc-label="instances" } + +=== "Instance name" + + #SCHEMA# dstack._internal.core.models.profiles.InstanceNameSelector + overrides: + show_root_heading: false + +=== "Hostname" + + #SCHEMA# dstack._internal.core.models.profiles.InstanceHostnameSelector + overrides: + show_root_heading: false + +=== "Fleet instance" + + #SCHEMA# dstack._internal.core.models.profiles.FleetInstanceSelector + overrides: + show_root_heading: false + +??? info "Short syntax" + + The short syntax for `instances` is a list of instance names: + + ```yaml + instances: + - my-fleet-0 + ``` + ### `resources` #SCHEMA# dstack._internal.core.models.resources.ResourcesSpec diff --git a/src/dstack/_internal/core/compatibility/common.py b/src/dstack/_internal/core/compatibility/common.py index 789e2d120a..8a34b40579 100644 --- a/src/dstack/_internal/core/compatibility/common.py +++ b/src/dstack/_internal/core/compatibility/common.py @@ -10,6 +10,8 @@ def get_profile_excludes(profile: Optional[ProfileParams]) -> IncludeExcludeSetT return excludes if profile.backend_options is None: excludes.add("backend_options") + if profile.instances is None: + excludes.add("instances") return excludes diff --git a/src/dstack/_internal/core/compatibility/runs.py b/src/dstack/_internal/core/compatibility/runs.py index cbed73c5bf..6013a940e2 100644 --- a/src/dstack/_internal/core/compatibility/runs.py +++ b/src/dstack/_internal/core/compatibility/runs.py @@ -83,6 +83,8 @@ def get_run_spec_excludes(run_spec: RunSpec) -> IncludeExcludeDictType: spec_excludes: IncludeExcludeDictType = {} configuration_excludes: IncludeExcludeDictType = {} profile_excludes = get_profile_excludes(run_spec.profile) + for field in get_profile_excludes(run_spec.configuration): + configuration_excludes[field] = True if run_spec.configuration.backend_options is None: configuration_excludes["backend_options"] = True diff --git a/src/dstack/_internal/core/models/profiles.py b/src/dstack/_internal/core/models/profiles.py index f1beebd5b9..bc3e5aa86e 100644 --- a/src/dstack/_internal/core/models/profiles.py +++ b/src/dstack/_internal/core/models/profiles.py @@ -234,6 +234,58 @@ def crons(self) -> List[str]: return self.cron +class InstanceNameSelector(CoreModel): + name: Annotated[str, Field(description="The fleet instance name", min_length=1)] + + +class InstanceHostnameSelector(CoreModel): + hostname: Annotated[ + str, Field(description="The fleet instance hostname or IP address", min_length=1) + ] + + +def _parse_fleet_instance_selector_fleet(v: Any) -> Any: + if isinstance(v, str): + return EntityReference.parse(v) + return v + + +class FleetInstanceSelectorConfig(CoreConfig): + @staticmethod + def schema_extra(schema: Dict[str, Any]): + add_extra_schema_types( + schema["properties"]["fleet"], + extra_types=[{"type": "string", "minLength": 1}], + ) + + +class FleetInstanceSelector(generate_dual_core_model(FleetInstanceSelectorConfig)): + fleet: Annotated[ + EntityReference, + Field( + description=( + "The fleet reference. For fleets owned by the current project, specify" + " the fleet name. For a fleet from another project, specify" + " `/` or an object with `project` and `name`" + ), + ), + ] + instance: Annotated[int, Field(description="The fleet instance number", ge=0)] + + _validate_fleet = validator("fleet", pre=True, allow_reuse=True)( + _parse_fleet_instance_selector_fleet + ) + + +InstanceSelector = Union[InstanceNameSelector, InstanceHostnameSelector, FleetInstanceSelector] + + +def parse_instance_selector(v: Union[InstanceSelector, str]) -> InstanceSelector: + if isinstance(v, str): + return InstanceNameSelector(name=v) + return v + + class ProfileParamsConfig(CoreConfig): @staticmethod def schema_extra(schema: Dict[str, Any]): @@ -249,6 +301,10 @@ def schema_extra(schema: Dict[str, Any]): schema["properties"]["idle_duration"], extra_types=[{"type": "string"}], ) + add_extra_schema_types( + schema["properties"]["instances"]["items"], + extra_types=[{"type": "string", "minLength": 1}], + ) class ProfileParams(CoreModel): @@ -387,10 +443,23 @@ class ProfileParams(CoreModel): description=( "The fleets considered for reuse." " For fleets owned by the current project, specify fleet names." - " For imported fleets, specify `/`" + " For fleets from another project, specify `/`" ), ), ] = None + instances: Annotated[ + Optional[List[InstanceSelector]], + Field( + description=( + "The specific fleet instances to consider for reuse." + " Each value can be an instance name string (e.g. `my-fleet-0`)" + " or an object with `name`, `hostname`, or `fleet` and `instance`." + " When set, the run is only placed on a matching existing instance" + " and no new instances are provisioned" + ), + min_items=1, + ), + ] = None tags: Annotated[ Optional[Dict[str, str]], Field( @@ -416,6 +485,9 @@ class ProfileParams(CoreModel): parse_idle_duration ) _validate_fleets = validator("fleets", allow_reuse=True, each_item=True)(EntityReference.parse) + _validate_instances = validator("instances", pre=True, allow_reuse=True, each_item=True)( + parse_instance_selector + ) _validate_tags = validator("tags", pre=True, allow_reuse=True)(tags_validator) _validate_backend_options = validator("backend_options", allow_reuse=True)( validate_backend_options diff --git a/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py b/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py index 2869374d55..4afa4601e8 100644 --- a/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py +++ b/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py @@ -97,6 +97,7 @@ get_instance_offer, get_instance_provisioning_data, is_placeholder_instance, + populate_instances_fleet, switch_instance_status, ) from dstack._internal.server.services.jobs import ( @@ -523,13 +524,22 @@ async def _select_assignment( if fleet_model is None: return _NoFleetAssignment() - if fleet_instances_with_offers: + required_instance_offers = 1 + if context.run.run_spec.merged_profile.instances is not None: + required_instance_offers = len(context.jobs_to_provision) + + if len(fleet_instances_with_offers) >= required_instance_offers: return _ExistingInstanceAssignment( fleet_id=fleet_model.id, master_job_provisioning_data=preconditions.master_job_provisioning_data, volumes=preconditions.prepared_job_volumes.volumes, ) + if context.run.run_spec.merged_profile.instances is not None: + # The run targets specific existing instances. Do not provision new capacity + # if no selected fleet has enough matching available instances. + return _NoFleetAssignment() + return _NewCapacityAssignment(fleet_id=fleet_model.id) @@ -643,10 +653,13 @@ async def _apply_assignment_result( assignment=assignment, fleet_model=fleet_model, ) - if not current_instance_offers: - # If the reusable offers vanished under the fleet lock, retry full - # assignment later instead of forcing new-capacity provisioning in a - # fleet that may no longer be optimal. + required_instance_offers = 1 + if context.run.run_spec.merged_profile.instances is not None: + required_instance_offers = len(context.jobs_to_provision) + if len(current_instance_offers) < required_instance_offers: + # If reusable offers vanished or are no longer enough under the fleet lock, + # retry full assignment later instead of forcing new-capacity provisioning + # in a fleet that may no longer be optimal. await _reset_job_lock_for_retry(session=session, item=item) return @@ -689,10 +702,16 @@ async def _load_submitted_job_context( FleetModel.instances.and_(InstanceModel.deleted == False) ) ) + .options( + joinedload(JobModel.fleet).joinedload(FleetModel.project).load_only(ProjectModel.name) + ) .execution_options(populate_existing=True) ) job_model = res.unique().scalar_one() run = run_model_to_run(run_model) + for fleet_model in (run_model.fleet, job_model.fleet): + if fleet_model is not None: + populate_instances_fleet(fleet_model) job = find_job(run.jobs, job_model.replica_num, job_model.job_num) replica_jobs = find_jobs(run.jobs, replica_num=job_model.replica_num) return _SubmittedJobContext( @@ -772,6 +791,9 @@ async def _fetch_run_model_for_submitted_job( FleetModel.instances.and_(InstanceModel.deleted == False) ) ) + .options( + joinedload(RunModel.fleet).joinedload(FleetModel.project).load_only(ProjectModel.name) + ) .options(contains_eager(RunModel.jobs, alias=job_alias)) .execution_options(populate_existing=True) ) @@ -905,6 +927,16 @@ async def _apply_no_fleet_selection( job_model: JobModel, run: Run, ) -> None: + if run.run_spec.merged_profile.instances is not None: + logger.debug("%s: failed to use specified instances", fmt(job_model)) + await _terminate_submitted_job( + session=session, + job_model=job_model, + reason=JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY, + message="Failed to use specified instances", + ) + return + if run.run_spec.merged_profile.fleets is not None: logger.debug("%s: failed to use specified fleets", fmt(job_model)) await _terminate_submitted_job( @@ -1047,6 +1079,7 @@ def _get_current_reusable_instance_offers( fleet_model: FleetModel, ) -> list[tuple[InstanceModel, InstanceOfferWithAvailability]]: return get_instance_offers_in_fleet( + project=context.project, fleet_model=fleet_model, run_spec=context.run.run_spec, job=context.job, diff --git a/src/dstack/_internal/server/services/instances.py b/src/dstack/_internal/server/services/instances.py index ad48ff1f51..bf15c8c903 100644 --- a/src/dstack/_internal/server/services/instances.py +++ b/src/dstack/_internal/server/services/instances.py @@ -8,6 +8,7 @@ from sqlalchemy import and_, exists, false, or_, select from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import joinedload, load_only +from sqlalchemy.orm.attributes import set_committed_value from dstack._internal.core.backends.base.offers import ( offer_to_catalog_item, @@ -34,6 +35,10 @@ ) from dstack._internal.core.models.profiles import ( DEFAULT_FLEET_TERMINATION_IDLE_TIME, + FleetInstanceSelector, + InstanceHostnameSelector, + InstanceNameSelector, + InstanceSelector, Profile, TerminationPolicy, ) @@ -375,6 +380,83 @@ def get_instance_ssh_private_keys(instance_model: InstanceModel) -> tuple[str, O return host_private_key, proxy_private_keys[0] +def instance_matches_selectors( + instance: InstanceModel, + selectors: List[InstanceSelector], + *, + project: ProjectModel, +) -> bool: + """ + Check if an instance matches any of the given instance selectors. + + Unqualified fleet references are interpreted as fleets of `project`. + `instance.fleet` (and `fleet.project` for project-qualified references) + must be loaded. + """ + return any( + instance_matches_selector(instance, selector, project=project) for selector in selectors + ) + + +def instance_matches_selector( + instance: InstanceModel, + selector: InstanceSelector, + *, + project: ProjectModel, +) -> bool: + if isinstance(selector, InstanceNameSelector): + return instance.name.lower() == selector.name.lower() + if isinstance(selector, InstanceHostnameSelector): + return instance_matches_hostname_selector(instance, selector) + if isinstance(selector, FleetInstanceSelector): + return _instance_matches_fleet_instance_selector(instance, selector, project=project) + return False + + +def instance_matches_hostname_selector( + instance: InstanceModel, selector: InstanceHostnameSelector +) -> bool: + candidates = set() + jpd = get_instance_provisioning_data(instance) + if jpd is not None: + if jpd.hostname is not None: + candidates.add(jpd.hostname.lower()) + if jpd.internal_ip is not None: + candidates.add(jpd.internal_ip.lower()) + rci = get_instance_remote_connection_info(instance) + if rci is not None: + candidates.add(rci.host.lower()) + return selector.hostname.lower() in candidates + + +def _instance_matches_fleet_instance_selector( + instance: InstanceModel, + selector: FleetInstanceSelector, + *, + project: ProjectModel, +) -> bool: + fleet = instance.fleet + if fleet is None: + return False + if fleet.name.lower() != selector.fleet.name.lower(): + return False + if instance.instance_num != selector.instance: + return False + if selector.fleet.project is None: + return fleet.project_id == project.id + return fleet.project.name.lower() == selector.fleet.project.lower() + + +def populate_instances_fleet(fleet_model: FleetModel) -> None: + """ + Set `instance.fleet` for instances fetched through `FleetModel.instances`. + SQLAlchemy does not populate the reverse many-to-one on load, and instance + selector matching requires it to be loaded. + """ + for instance in fleet_model.instances: + set_committed_value(instance, "fleet", fleet_model) + + def instance_matches_constraints( instance: InstanceModel, *, @@ -423,6 +505,7 @@ def filter_instances( master_job_provisioning_data: Optional[JobProvisioningData] = None, volumes: Optional[List[List[Volume]]] = None, shared: bool = False, + project: Optional[ProjectModel] = None, ) -> List[InstanceModel]: backend_types: Optional[list[BackendType]] = profile.backends regions: Optional[list[str]] = profile.regions @@ -462,11 +545,20 @@ def filter_instances( regions = [r for r in regions if r == master_job_provisioning_data.region] instance_types = profile.instance_types + instance_selectors = profile.instances + if instance_selectors is not None and project is None: + raise ValueError("project must be provided when profile.instances is set") filtered_instances: List[InstanceModel] = [] for instance in instances: if instance.unreachable: continue + if instance_selectors is not None and not instance_matches_selectors( + instance, + instance_selectors, + project=common_utils.get_or_error(project), + ): + continue if instance.health.is_failure(): continue if status is not None and instance.status != status: @@ -508,6 +600,7 @@ def get_shared_instances_with_offers( idle_only: bool = False, multinode: bool = False, volumes: Optional[List[List[Volume]]] = None, + project: Optional[ProjectModel] = None, ) -> list[tuple[InstanceModel, InstanceOfferWithAvailability]]: instances_with_offers: list[tuple[InstanceModel, InstanceOfferWithAvailability]] = [] query_filter = requirements_to_query_filter(requirements) @@ -517,6 +610,7 @@ def get_shared_instances_with_offers( multinode=multinode, volumes=volumes, shared=True, + project=project, ) for instance in filtered_instances: if idle_only and instance.status not in [InstanceStatus.IDLE, InstanceStatus.BUSY]: @@ -560,7 +654,11 @@ async def get_pool_instances( ), InstanceModel.deleted == False, ) - .options(joinedload(InstanceModel.fleet)) + .options( + joinedload(InstanceModel.fleet) + .joinedload(FleetModel.project) + .load_only(ProjectModel.name) + ) ) instance_models = list(res.unique().scalars().all()) return instance_models diff --git a/src/dstack/_internal/server/services/runs/plan.py b/src/dstack/_internal/server/services/runs/plan.py index 942bd1302b..ffbfcbf7d8 100644 --- a/src/dstack/_internal/server/services/runs/plan.py +++ b/src/dstack/_internal/server/services/runs/plan.py @@ -16,7 +16,11 @@ InstanceOfferWithAvailability, InstanceStatus, ) -from dstack._internal.core.models.profiles import CreationPolicy, Profile +from dstack._internal.core.models.profiles import ( + CreationPolicy, + FleetInstanceSelector, + Profile, +) from dstack._internal.core.models.runs import ( Job, JobPlan, @@ -45,6 +49,7 @@ get_pool_instances, get_shared_instances_with_offers, is_placeholder_instance, + populate_instances_fleet, ) from dstack._internal.server.services.jobs import ( get_instances_ids_with_detaching_volumes, @@ -210,28 +215,70 @@ async def get_run_candidate_fleet_models_filters( if run_spec.merged_profile.fleets is not None: fleet_conditions = [] for ref in map(EntityReference.parse, run_spec.merged_profile.fleets): - if ref.project is None: - fleet_conditions.append( - and_( - FleetModel.name == ref.name, - FleetModel.project_id == project.id, - ) - ) - else: - fleet_conditions.append( - and_( - FleetModel.name == ref.name, - ProjectModel.name == ref.project, - ) - ) + fleet_conditions.append(_get_fleet_reference_condition(project=project, ref=ref)) fleet_filters.append(or_(*fleet_conditions)) instance_filters = [ InstanceModel.deleted == False, InstanceModel.id.not_in(detaching_instances_ids), ] + fleet_instance_selectors = _get_only_fleet_instance_selectors(run_spec.merged_profile) + if fleet_instance_selectors is not None: + # Name and hostname selectors are checked after instances are loaded. + fleet_filters.append( + or_( + *[ + _get_fleet_reference_condition( + project=project, + ref=selector.fleet, + ) + for selector in fleet_instance_selectors + ] + ) + ) + instance_filters.append( + or_( + *[ + and_( + _get_fleet_reference_condition( + project=project, + ref=selector.fleet, + ), + InstanceModel.instance_num == selector.instance, + ) + for selector in fleet_instance_selectors + ] + ) + ) return fleet_filters, instance_filters +def _get_fleet_reference_condition(project: ProjectModel, ref: EntityReference): + if ref.project is None: + return and_( + FleetModel.name == ref.name, + FleetModel.project_id == project.id, + ) + return and_( + FleetModel.name == ref.name, + ProjectModel.name == ref.project, + ) + + +def _get_only_fleet_instance_selectors( + profile: Profile, +) -> Optional[list[FleetInstanceSelector]]: + if profile.instances is None: + return None + fleet_instance_selectors = [] + for selector in profile.instances: + if not isinstance(selector, FleetInstanceSelector): + return None + fleet_instance_selectors.append(selector) + if not fleet_instance_selectors: + return None + return fleet_instance_selectors + + async def select_run_candidate_fleet_models_with_filters( session: AsyncSession, fleet_filters: list, @@ -241,13 +288,21 @@ async def select_run_candidate_fleet_models_with_filters( # Selecting fleets in two queries since Postgres does not allow # locking nullable side of an outer join. So, first lock instances with inner join. # Then select left out fleets without instances. + with_instances_options = [ + contains_eager(FleetModel.instances), + contains_eager(FleetModel.project).load_only(ProjectModel.name), + ] + without_instances_options = [ + noload(FleetModel.instances), + contains_eager(FleetModel.project).load_only(ProjectModel.name), + ] stmt = ( select(FleetModel) .join(FleetModel.project) # can be referenced by fleet_filters .join(FleetModel.instances) .where(*fleet_filters) .where(*instance_filters) - .options(contains_eager(FleetModel.instances)) + .options(*with_instances_options) .execution_options(populate_existing=True) ) if lock_instances: @@ -274,10 +329,12 @@ async def select_run_candidate_fleet_models_with_filters( not_(and_(*instance_filters)), ) ) - .options(noload(FleetModel.instances)) + .options(*without_instances_options) .execution_options(populate_existing=True) ) fleet_models_without_instances = list(res.unique().scalars().all()) + for fleet_model in fleet_models_with_instances: + populate_instances_fleet(fleet_model) return fleet_models_with_instances, fleet_models_without_instances @@ -323,6 +380,7 @@ async def find_optimal_fleet_with_offers( if run_model is not None and run_model.fleet is not None: # Using the fleet that was already chosen by the master job instance_offers = get_instance_offers_in_fleet( + project=project, fleet_model=run_model.fleet, run_spec=run_spec, job=job, @@ -358,6 +416,7 @@ async def find_optimal_fleet_with_offers( continue all_instance_offers = get_instance_offers_in_fleet( + project=project, fleet_model=fleet_model, run_spec=run_spec, job=job, @@ -382,10 +441,14 @@ async def find_optimal_fleet_with_offers( ) ) - # If any candidate fleet has pool capacity, the optimal fleet will be one of - # those, so backend offers from any fleet won't affect selection — skip them entirely when allowed. - skip_backend_offers = skip_backend_offers_on_pool_capacity and any( - candidate.has_pool_capacity for candidate in candidates + # Backend offers cannot satisfy a run that targets specific `instances`, + # so they are always skipped for such runs. Otherwise, if any candidate + # fleet has pool capacity, the optimal fleet will be one of those, so + # backend offers from any fleet won't affect selection — skip them + # entirely when allowed. + skip_backend_offers = run_spec.merged_profile.instances is not None or ( + skip_backend_offers_on_pool_capacity + and any(candidate.has_pool_capacity for candidate in candidates) ) # Second step: gather backend offers unless skipped. @@ -486,6 +549,8 @@ def get_instance_offers_in_fleet( fleet_model: FleetModel, run_spec: RunSpec, job: Job, + *, + project: Optional[ProjectModel] = None, master_job_provisioning_data: Optional[JobProvisioningData] = None, volumes: Optional[list[list[Volume]]] = None, exclude_not_available: bool = False, @@ -500,6 +565,7 @@ def get_instance_offers_in_fleet( master_job_provisioning_data=master_job_provisioning_data, volumes=volumes, shared=False, + project=project, ) instances_with_offers = _get_offers_from_instances(nonshared_instances) shared_instances_with_offers = get_shared_instances_with_offers( @@ -508,6 +574,7 @@ def get_instance_offers_in_fleet( requirements=job.job_spec.requirements, multinode=multinode, volumes=volumes, + project=project, ) instances_with_offers.extend(shared_instances_with_offers) instances_with_offers.sort(key=lambda o: o[0].price or 0) @@ -619,6 +686,7 @@ async def _get_pool_offers( requirements=job.job_spec.requirements, volumes=volumes, multinode=multinode, + project=project, ) for offer in shared_instances_with_offers: pool_offers.append(offer) @@ -630,6 +698,7 @@ async def _get_pool_offers( multinode=multinode, volumes=volumes, shared=False, + project=project, ) nonshared_instances_with_offers = _get_offers_from_instances(nonshared_instances) pool_offers.extend(nonshared_instances_with_offers) @@ -659,16 +728,18 @@ async def _get_non_fleet_offers( job=job, volumes=volumes, ) - backend_offers = await get_offers_by_requirements( - project=project, - profile=profile, - requirements=job.job_spec.requirements, - exclude_not_available=False, - multinode=is_multinode_job(job), - volumes=volumes, - privileged=job.job_spec.privileged, - instance_mounts=check_run_spec_requires_instance_mounts(run_spec), - ) + backend_offers: list[tuple[Backend, InstanceOfferWithAvailability]] = [] + if profile.instances is None: + backend_offers = await get_offers_by_requirements( + project=project, + profile=profile, + requirements=job.job_spec.requirements, + exclude_not_available=False, + multinode=is_multinode_job(job), + volumes=volumes, + privileged=job.job_spec.privileged, + instance_mounts=check_run_spec_requires_instance_mounts(run_spec), + ) return instance_offers, backend_offers @@ -687,6 +758,9 @@ async def get_backend_offers_in_run_candidate_fleets( It resolves the selected fleets from `run_spec`, requests backend offers in each fleet, merges them, and deduplicates identical backend offers across fleets. """ + if run_spec.merged_profile.instances is not None: + return [] + candidate_fleet_models = await _select_candidate_fleet_models( session=session, project=project, @@ -741,6 +815,7 @@ async def _get_offers_in_run_candidate_fleets( for candidate_fleet_model in candidate_fleet_models: instance_offers.extend( get_instance_offers_in_fleet( + project=project, fleet_model=candidate_fleet_model, run_spec=run_spec, job=job, @@ -810,7 +885,9 @@ def _get_job_plan( ) -> JobPlan: job_offers: list[InstanceOfferWithAvailability] = [] job_offers.extend(offer for _, offer in instance_offers) - if profile.creation_policy == CreationPolicy.REUSE_OR_CREATE: + # When the run targets specific instances, new capacity is never provisioned, + # so backend offers are not actually usable and must not be shown in the plan. + if profile.creation_policy == CreationPolicy.REUSE_OR_CREATE and profile.instances is None: job_offers.extend(offer for _, offer in backend_offers) job_offers.sort(key=lambda offer: not offer.availability.is_available()) remove_job_spec_sensitive_info(job.job_spec) diff --git a/src/dstack/_internal/server/services/runs/spec.py b/src/dstack/_internal/server/services/runs/spec.py index cb989ef5b4..66e91053b8 100644 --- a/src/dstack/_internal/server/services/runs/spec.py +++ b/src/dstack/_internal/server/services/runs/spec.py @@ -115,6 +115,18 @@ def validate_run_spec_and_set_defaults( raise ServerClientError( f"Probe timeout cannot be longer than {settings.MAX_PROBE_TIMEOUT}s" ) + instances = run_spec.merged_profile.instances + # Only multinode tasks require a dedicated instance per node. Service + # replicas may share one instance with enough idle blocks, so they are + # not validated against the instance count. + if instances is not None and run_spec.configuration.type == "task": + nodes_required_num = get_nodes_required_num(run_spec) + if len(instances) < nodes_required_num: + raise ServerClientError( + f"`instances` specifies {len(instances)} instance(s)" + f" but the run requires {nodes_required_num} nodes." + " Specify at least as many instances as nodes." + ) if run_spec.configuration.priority is None: run_spec.configuration.priority = RUN_PRIORITY_DEFAULT # We do not reject top-level `resources` when `replicas` is a list. Adding strict checks diff --git a/src/tests/_internal/core/compatibility/test_runs.py b/src/tests/_internal/core/compatibility/test_runs.py new file mode 100644 index 0000000000..516b15d512 --- /dev/null +++ b/src/tests/_internal/core/compatibility/test_runs.py @@ -0,0 +1,66 @@ +from dstack._internal.core.compatibility.fleets import get_fleet_spec_excludes +from dstack._internal.core.compatibility.runs import get_run_spec_excludes +from dstack._internal.core.models.configurations import TaskConfiguration +from dstack._internal.core.models.fleets import FleetConfiguration, FleetSpec +from dstack._internal.core.models.profiles import InstanceNameSelector, Profile +from dstack._internal.core.models.runs import RunSpec + + +class TestGetRunSpecExcludes: + def test_excludes_unset_instances_for_old_servers(self): + run_spec = RunSpec( + configuration=TaskConfiguration(commands=["echo"]), + profile=Profile(name="default"), + ) + + excludes = get_run_spec_excludes(run_spec) + + assert excludes["configuration"]["instances"] is True + assert "instances" in excludes["profile"] + + def test_keeps_configuration_instances_when_set(self): + run_spec = RunSpec( + configuration=TaskConfiguration( + commands=["echo"], + instances=[InstanceNameSelector(name="my-fleet-0")], + ), + profile=Profile(name="default"), + ) + + excludes = get_run_spec_excludes(run_spec) + + assert "instances" not in excludes["configuration"] + assert "instances" in excludes["profile"] + + def test_keeps_profile_instances_when_set(self): + run_spec = RunSpec( + configuration=TaskConfiguration(commands=["echo"]), + profile=Profile( + name="default", + instances=[InstanceNameSelector(name="my-fleet-0")], + ), + ) + + excludes = get_run_spec_excludes(run_spec) + + assert excludes["configuration"]["instances"] is True + assert "instances" not in excludes["profile"] + + +class TestGetFleetSpecExcludes: + def test_excludes_unset_profile_instances_for_old_servers(self): + spec = FleetSpec(configuration=FleetConfiguration(), profile=Profile()) + + excludes = get_fleet_spec_excludes(spec) + + assert excludes is not None + assert "instances" in excludes["profile"] + + def test_keeps_profile_instances_when_set(self): + spec = FleetSpec(configuration=FleetConfiguration(), profile=Profile()) + spec.profile.instances = [InstanceNameSelector(name="my-fleet-0")] + + excludes = get_fleet_spec_excludes(spec) + + assert excludes is not None + assert "instances" not in excludes["profile"] diff --git a/src/tests/_internal/core/models/test_profiles.py b/src/tests/_internal/core/models/test_profiles.py index 4a1caf8bf8..326f3eb31f 100644 --- a/src/tests/_internal/core/models/test_profiles.py +++ b/src/tests/_internal/core/models/test_profiles.py @@ -2,7 +2,13 @@ from pydantic import ValidationError from dstack._internal.core.backends.vastai.profile_options import VastAIProfileOptions -from dstack._internal.core.models.profiles import Profile +from dstack._internal.core.models.common import EntityReference +from dstack._internal.core.models.profiles import ( + FleetInstanceSelector, + InstanceHostnameSelector, + InstanceNameSelector, + Profile, +) class TestValidateProfileBackendOptions: @@ -27,3 +33,80 @@ def test_none_backend_options_is_valid(self): def test_empty_list_backend_options_is_valid(self): profile = Profile(backend_options=[]) assert profile.backend_options == [] + + +class TestProfileInstances: + def test_string_is_parsed_as_instance_name_selector(self): + profile = Profile.parse_obj({"instances": ["my-fleet-1"]}) + + assert profile.instances == [InstanceNameSelector(name="my-fleet-1")] + + @pytest.mark.parametrize( + ("value", "expected"), + [ + ({"name": "my-fleet-1"}, InstanceNameSelector(name="my-fleet-1")), + ({"hostname": "worker-1"}, InstanceHostnameSelector(hostname="worker-1")), + ( + {"fleet": "my-fleet", "instance": 3}, + FleetInstanceSelector(fleet="my-fleet", instance=3), + ), + ( + {"fleet": "other-project/my-fleet", "instance": 3}, + FleetInstanceSelector(fleet="other-project/my-fleet", instance=3), + ), + ], + ) + def test_object_selectors_are_parsed(self, value, expected): + profile = Profile.parse_obj({"instances": [value]}) + + assert profile.instances == [expected] + + @pytest.mark.parametrize( + "value", + [ + "", + {"name": "my-fleet-1", "hostname": "worker-1"}, + {"name": ""}, + {"hostname": ""}, + {"fleet": "", "instance": 0}, + {"fleet": "project/name/extra", "instance": 0}, + {"fleet": "my-fleet"}, + {"fleet": "my-fleet", "instance": -1}, + {"hostname": "worker-1", "extra": "value"}, + ], + ) + def test_invalid_object_selector_is_rejected(self, value): + with pytest.raises(ValidationError): + Profile.parse_obj({"instances": [value]}) + + def test_empty_instances_list_is_rejected(self): + with pytest.raises(ValidationError): + Profile.parse_obj({"instances": []}) + + def test_parses_fleet_selector_string_to_entity_reference(self): + profile = Profile.parse_obj( + {"name": "test", "instances": [{"fleet": "main/my-fleet", "instance": 0}]} + ) + assert profile.instances == [ + FleetInstanceSelector( + fleet=EntityReference(project="main", name="my-fleet"), instance=0 + ) + ] + + def test_parses_fleet_selector_object_notation(self): + profile = Profile.parse_obj( + { + "name": "test", + "instances": [{"fleet": {"project": "main", "name": "my-fleet"}, "instance": 0}], + } + ) + assert profile.instances == [ + FleetInstanceSelector( + fleet=EntityReference(project="main", name="my-fleet"), instance=0 + ) + ] + + @pytest.mark.parametrize("fleet", ["", "a/b/c", "/my-fleet", "my-project/"]) + def test_rejects_invalid_fleet_selector_reference(self, fleet: str): + with pytest.raises(ValidationError): + Profile.parse_obj({"name": "test", "instances": [{"fleet": fleet, "instance": 0}]}) diff --git a/src/tests/_internal/server/background/pipeline_tasks/test_submitted_jobs.py b/src/tests/_internal/server/background/pipeline_tasks/test_submitted_jobs.py index fd9cf3b58a..2a22a4b8bd 100644 --- a/src/tests/_internal/server/background/pipeline_tasks/test_submitted_jobs.py +++ b/src/tests/_internal/server/background/pipeline_tasks/test_submitted_jobs.py @@ -17,7 +17,7 @@ from dstack._internal.core.models.fleets import FleetNodesSpec, InstanceGroupPlacement from dstack._internal.core.models.instances import InstanceStatus from dstack._internal.core.models.placement import PlacementGroup -from dstack._internal.core.models.profiles import Profile +from dstack._internal.core.models.profiles import InstanceNameSelector, Profile from dstack._internal.core.models.runs import JobStatus, JobTerminationReason from dstack._internal.core.models.users import GlobalRole from dstack._internal.core.models.volumes import ( @@ -1165,6 +1165,148 @@ async def test_assigns_job_to_specific_fleet( assert job.instance is not None and job.instance.id == instance_2.id assert job.fleet_id == fleet_2.id + async def test_assigns_job_to_specific_instance( + self, test_db, session: AsyncSession, worker: JobSubmittedWorker + ): + project = await create_project(session=session) + user = await create_user(session=session) + repo = await create_repo(session=session, project_id=project.id) + fleet = await create_fleet(session=session, project=project, name="my-fleet") + await create_instance( + session=session, + project=project, + fleet=fleet, + status=InstanceStatus.IDLE, + instance_num=0, + name="my-fleet-0", + ) + instance_1 = await create_instance( + session=session, + project=project, + fleet=fleet, + status=InstanceStatus.IDLE, + instance_num=1, + name="my-fleet-1", + ) + run_spec = get_run_spec( + repo_id=repo.name, + profile=Profile(name="default", instances=[InstanceNameSelector(name="my-fleet-1")]), + ) + run = await create_run( + session=session, + project=project, + repo=repo, + user=user, + run_spec=run_spec, + ) + job = await create_job(session=session, run=run) + + await _process_job(session=session, worker=worker, job_model=job) + + job = await _get_job(session, job.id) + assert job.instance_assigned + assert job.instance is not None and job.instance.id == instance_1.id + assert job.fleet_id == fleet.id + + async def test_does_not_assign_targeted_multinode_job_without_enough_instances( + self, test_db, session: AsyncSession, worker: JobSubmittedWorker + ): + project = await create_project(session=session) + user = await create_user(session=session) + repo = await create_repo(session=session, project_id=project.id) + fleet = await create_fleet(session=session, project=project, name="my-fleet") + instance = await create_instance( + session=session, + project=project, + fleet=fleet, + status=InstanceStatus.IDLE, + instance_num=0, + name="my-fleet-0", + ) + run_spec = get_run_spec( + repo_id=repo.name, + configuration=TaskConfiguration(image="debian", nodes=2), + profile=Profile( + name="default", + instances=[ + InstanceNameSelector(name="my-fleet-0"), + InstanceNameSelector(name="my-fleet-1"), + ], + ), + ) + run = await create_run( + session=session, + project=project, + repo=repo, + user=user, + run_spec=run_spec, + ) + master_job = await create_job( + session=session, + run=run, + job_num=0, + waiting_master_job=False, + ) + await create_job( + session=session, + run=run, + job_num=1, + waiting_master_job=True, + ) + + await _process_job(session=session, worker=worker, job_model=master_job) + + master_job = await _get_job(session, master_job.id) + await session.refresh(instance) + assert master_job.status == JobStatus.TERMINATING + assert ( + master_job.termination_reason + == JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY + ) + assert not master_job.instance_assigned + assert master_job.instance is None + assert instance.status == InstanceStatus.IDLE + + async def test_does_not_provision_new_capacity_when_instances_specified( + self, test_db, session: AsyncSession, worker: JobSubmittedWorker + ): + project = await create_project(session=session) + user = await create_user(session=session) + repo = await create_repo(session=session, project_id=project.id) + # A candidate fleet exists, but no instance matches the selector. + await create_fleet(session=session, project=project) + run_spec = get_run_spec( + repo_id=repo.name, + profile=Profile( + name="default", + instances=[InstanceNameSelector(name="missing-instance")], + ), + ) + run = await create_run( + session=session, + project=project, + repo=repo, + user=user, + run_spec=run_spec, + ) + job = await create_job(session=session, run=run) + + offer = get_instance_offer_with_availability(backend=BackendType.AWS) + with patch("dstack._internal.server.services.backends.get_project_backends") as m: + backend_mock = Mock() + m.return_value = [backend_mock] + backend_mock.TYPE = BackendType.AWS + backend_mock.compute.return_value.get_offers.return_value = [offer] + + await _process_job(session=session, worker=worker, job_model=job) + + await session.refresh(job) + assert job.status == JobStatus.TERMINATING + assert job.termination_reason == JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY + # No placeholder instance should be created for a specific-instance target. + res = await session.execute(select(InstanceModel)) + assert res.scalars().all() == [] + async def test_assignment_creates_placeholder_instance_for_new_capacity( self, test_db, session: AsyncSession, worker: JobSubmittedWorker ): diff --git a/src/tests/_internal/server/routers/test_fleets.py b/src/tests/_internal/server/routers/test_fleets.py index 16ce066866..6e231e74bf 100644 --- a/src/tests/_internal/server/routers/test_fleets.py +++ b/src/tests/_internal/server/routers/test_fleets.py @@ -973,6 +973,7 @@ async def test_creates_fleet(self, test_db, session: AsyncSession, client: Async "default": False, "reservation": None, "fleets": None, + "instances": None, "tags": None, "backend_options": None, }, @@ -1093,6 +1094,7 @@ async def test_creates_ssh_fleet(self, test_db, session: AsyncSession, client: A "default": False, "reservation": None, "fleets": None, + "instances": None, "tags": None, "backend_options": None, }, @@ -1312,6 +1314,7 @@ async def test_updates_ssh_fleet(self, test_db, session: AsyncSession, client: A "default": False, "reservation": None, "fleets": None, + "instances": None, "tags": None, "backend_options": None, }, diff --git a/src/tests/_internal/server/routers/test_runs.py b/src/tests/_internal/server/routers/test_runs.py index a46ec93f8d..dc108ce514 100644 --- a/src/tests/_internal/server/routers/test_runs.py +++ b/src/tests/_internal/server/routers/test_runs.py @@ -213,6 +213,7 @@ def get_dev_env_run_plan_dict( "schedule": None, "reservation": None, "fleets": None, + "instances": None, "tags": None, "backend_options": None, "priority": 0, @@ -239,6 +240,7 @@ def get_dev_env_run_plan_dict( "schedule": None, "reservation": None, "fleets": None, + "instances": None, "tags": None, "backend_options": None, }, @@ -458,6 +460,7 @@ def get_dev_env_run_dict( "schedule": None, "reservation": None, "fleets": None, + "instances": None, "tags": None, "backend_options": None, "priority": 0, @@ -484,6 +487,7 @@ def get_dev_env_run_dict( "schedule": None, "reservation": None, "fleets": None, + "instances": None, "tags": None, "backend_options": None, }, diff --git a/src/tests/_internal/server/services/runs/test_plan.py b/src/tests/_internal/server/services/runs/test_plan.py index 5836319bc1..e1be5410bc 100644 --- a/src/tests/_internal/server/services/runs/test_plan.py +++ b/src/tests/_internal/server/services/runs/test_plan.py @@ -7,13 +7,25 @@ from dstack._internal.core.models.configurations import TaskConfiguration from dstack._internal.core.models.fleets import FleetNodesSpec, InstanceGroupPlacement from dstack._internal.core.models.instances import InstanceAvailability +from dstack._internal.core.models.profiles import ( + CreationPolicy, + FleetInstanceSelector, + InstanceNameSelector, + Profile, +) from dstack._internal.server.services.jobs import get_jobs_from_run_spec from dstack._internal.server.services.runs.plan import ( _freeze_offer_identity_value, _get_backend_offer_identity, _get_backend_offers_in_fleet, + _get_job_plan, + find_optimal_fleet_with_offers, + get_backend_offers_in_run_candidate_fleets, + get_run_candidate_fleet_models_filters, + select_run_candidate_fleet_models_with_filters, ) from dstack._internal.server.testing.common import ( + create_export, create_fleet, create_instance, create_project, @@ -66,6 +78,53 @@ def test_get_backend_offer_identity_uses_full_offer_payload(self) -> None: assert _get_backend_offer_identity(offer) != _get_backend_offer_identity(different_offer) +class TestGetJobPlan: + @pytest.mark.asyncio + async def test_includes_backend_offers_by_default(self) -> None: + run_spec = get_run_spec( + repo_id="test-repo", + configuration=TaskConfiguration(image="debian", commands=["echo"]), + ) + jobs = await get_jobs_from_run_spec(run_spec=run_spec, secrets={}, replica_num=0) + instance_offer = get_instance_offer_with_availability() + backend_offer = get_instance_offer_with_availability() + + job_plan = _get_job_plan( + instance_offers=[(None, instance_offer)], + backend_offers=[(None, backend_offer)], + profile=Profile(name="default", creation_policy=CreationPolicy.REUSE_OR_CREATE), + job=jobs[0], + max_offers=None, + ) + + assert job_plan.total_offers == 2 + + @pytest.mark.asyncio + async def test_excludes_backend_offers_when_instances_specified(self) -> None: + run_spec = get_run_spec( + repo_id="test-repo", + configuration=TaskConfiguration(image="debian", commands=["echo"]), + ) + jobs = await get_jobs_from_run_spec(run_spec=run_spec, secrets={}, replica_num=0) + instance_offer = get_instance_offer_with_availability() + backend_offer = get_instance_offer_with_availability() + + job_plan = _get_job_plan( + instance_offers=[(None, instance_offer)], + backend_offers=[(None, backend_offer)], + profile=Profile( + name="default", + creation_policy=CreationPolicy.REUSE_OR_CREATE, + instances=[InstanceNameSelector(name="my-fleet-0")], + ), + job=jobs[0], + max_offers=None, + ) + + assert job_plan.total_offers == 1 + assert job_plan.offers == [instance_offer] + + class TestGetBackendOffersInFleet: @pytest.mark.asyncio @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) @@ -113,3 +172,156 @@ async def test_keeps_unconstrained_offers_for_non_empty_cluster_fleet_without_el get_offers_by_requirements_mock.await_args.kwargs["master_job_provisioning_data"] is None ) + + +class TestSelectRunCandidateFleetModelsWithFilters: + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + @pytest.mark.parametrize( + ("selector", "expected_fleet_project_name"), + [ + ("same-fleet", "importer-project"), + ("exporter-project/same-fleet", "exporter-project"), + ], + ) + async def test_fleet_instance_selector_narrows_candidate_fleets( + self, + test_db, + session: AsyncSession, + selector: str, + expected_fleet_project_name: str, + ) -> None: + user = await create_user(session=session) + project = await create_project(session=session, owner=user, name="importer-project") + exporter_project = await create_project( + session=session, owner=user, name="exporter-project" + ) + local_fleet = await create_fleet(session=session, project=project, name="same-fleet") + exported_fleet = await create_fleet( + session=session, project=exporter_project, name="same-fleet" + ) + unrelated_fleet = await create_fleet( + session=session, project=project, name="unrelated-fleet" + ) + await create_instance( + session=session, + project=project, + fleet=local_fleet, + instance_num=1, + ) + await create_instance( + session=session, + project=exporter_project, + fleet=exported_fleet, + instance_num=1, + ) + await create_instance( + session=session, + project=project, + fleet=unrelated_fleet, + instance_num=1, + ) + await create_export( + session=session, + exporter_project=exporter_project, + importer_projects=[project], + exported_fleets=[exported_fleet], + ) + run_spec = get_run_spec( + repo_id="test-repo", + configuration=TaskConfiguration(image="debian", commands=["echo"]), + profile=Profile(instances=[FleetInstanceSelector(fleet=selector, instance=1)]), + ) + fleet_filters, instance_filters = await get_run_candidate_fleet_models_filters( + session=session, + project=project, + run_model=None, + run_spec=run_spec, + ) + + ( + fleets_with_instances, + fleets_without_instances, + ) = await select_run_candidate_fleet_models_with_filters( + session=session, + fleet_filters=fleet_filters, + instance_filters=instance_filters, + lock_instances=False, + ) + + assert [fleet.project.name for fleet in fleets_with_instances] == [ + expected_fleet_project_name + ] + assert fleets_without_instances == [] + + +class TestFindOptimalFleetWithOffers: + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + async def test_skips_backend_offers_when_instances_specified( + self, test_db, session: AsyncSession, monkeypatch: pytest.MonkeyPatch + ) -> None: + user = await create_user(session=session) + project = await create_project(session=session, owner=user) + repo = await create_repo(session=session, project_id=project.id) + fleet = await create_fleet(session=session, project=project) + run_spec = get_run_spec( + repo_id=repo.name, + configuration=TaskConfiguration(image="debian", commands=["echo"]), + profile=Profile(instances=[InstanceNameSelector(name="missing-instance")]), + ) + jobs = await get_jobs_from_run_spec(run_spec=run_spec, secrets={}, replica_num=0) + get_backend_offers_in_fleet_mock = AsyncMock() + monkeypatch.setattr( + "dstack._internal.server.services.runs.plan._get_backend_offers_in_fleet", + get_backend_offers_in_fleet_mock, + ) + + fleet_model, instance_offers, backend_offers = await find_optimal_fleet_with_offers( + project=project, + fleet_models=[fleet], + run_model=None, + run_spec=run_spec, + job=jobs[0], + master_job_provisioning_data=None, + volumes=None, + exclude_not_available=False, + ) + + assert fleet_model == fleet + assert instance_offers == [] + assert backend_offers == [] + get_backend_offers_in_fleet_mock.assert_not_awaited() + + +class TestGetBackendOffersInRunCandidateFleets: + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + async def test_skips_backend_offers_when_instances_specified( + self, test_db, session: AsyncSession, monkeypatch: pytest.MonkeyPatch + ) -> None: + user = await create_user(session=session) + project = await create_project(session=session, owner=user) + repo = await create_repo(session=session, project_id=project.id) + run_spec = get_run_spec( + repo_id=repo.name, + configuration=TaskConfiguration(image="debian", commands=["echo"]), + profile=Profile(instances=[InstanceNameSelector(name="missing-instance")]), + ) + jobs = await get_jobs_from_run_spec(run_spec=run_spec, secrets={}, replica_num=0) + select_candidate_fleet_models_mock = AsyncMock() + monkeypatch.setattr( + "dstack._internal.server.services.runs.plan._select_candidate_fleet_models", + select_candidate_fleet_models_mock, + ) + + offers = await get_backend_offers_in_run_candidate_fleets( + session=session, + project=project, + run_spec=run_spec, + job=jobs[0], + volumes=None, + ) + + assert offers == [] + select_candidate_fleet_models_mock.assert_not_awaited() diff --git a/src/tests/_internal/server/services/runs/test_spec.py b/src/tests/_internal/server/services/runs/test_spec.py index 093ca768cf..d9901121c9 100644 --- a/src/tests/_internal/server/services/runs/test_spec.py +++ b/src/tests/_internal/server/services/runs/test_spec.py @@ -5,9 +5,13 @@ import pytest from dstack._internal.core.errors import ServerClientError -from dstack._internal.core.models.configurations import ServiceConfiguration +from dstack._internal.core.models.configurations import ( + DevEnvironmentConfiguration, + ServiceConfiguration, + TaskConfiguration, +) from dstack._internal.core.models.files import FileArchiveMapping -from dstack._internal.core.models.profiles import Profile, ProfileRetry +from dstack._internal.core.models.profiles import InstanceNameSelector, Profile, ProfileRetry from dstack._internal.core.models.repos.local import LocalRunRepoData from dstack._internal.core.models.runs import RunSpec from dstack._internal.server.services.runs.spec import ( @@ -98,6 +102,65 @@ def test_rejects_negative_retry_duration_for_new_run_specs(self): ) +class TestValidateRunSpecInstances: + def _user(self): + return SimpleNamespace(ssh_public_key="ssh-rsa test") + + def test_rejects_fewer_instances_than_nodes(self): + run_spec = get_run_spec( + repo_id="test-repo", + configuration=TaskConfiguration(commands=["echo"], nodes=2), + profile=Profile( + name="default", + instances=[InstanceNameSelector(name="my-fleet-0")], + ), + ) + + with pytest.raises(ServerClientError, match="instances"): + validate_run_spec_and_set_defaults(self._user(), run_spec) + + def test_allows_matching_instances_and_nodes(self): + run_spec = get_run_spec( + repo_id="test-repo", + configuration=TaskConfiguration(commands=["echo"], nodes=2), + profile=Profile( + name="default", + instances=[ + InstanceNameSelector(name="my-fleet-0"), + InstanceNameSelector(name="my-fleet-1"), + ], + ), + ) + + validate_run_spec_and_set_defaults(self._user(), run_spec) + + def test_allows_single_node_with_instances(self): + run_spec = get_run_spec( + repo_id="test-repo", + configuration=DevEnvironmentConfiguration(ide="vscode"), + profile=Profile( + name="default", + instances=[InstanceNameSelector(name="my-fleet-3")], + ), + ) + + validate_run_spec_and_set_defaults(self._user(), run_spec) + + def test_allows_fewer_instances_than_service_replicas(self): + # Service replicas can share a multi-block instance, so the + # instance-count check applies only to multinode tasks. + run_spec = get_run_spec( + repo_id="test-repo", + configuration=ServiceConfiguration(commands=["echo"], port=8000, replicas=2), + profile=Profile( + name="default", + instances=[InstanceNameSelector(name="my-fleet-0")], + ), + ) + + validate_run_spec_and_set_defaults(self._user(), run_spec) + + class TestCheckCanUpdateConfigurationRouterType: def test_sglang_to_dynamo_router_type_change_is_rejected(self): current = _run_spec(_service_configuration(router_type="sglang")) diff --git a/src/tests/_internal/server/services/test_instances.py b/src/tests/_internal/server/services/test_instances.py index 1f5e1fb52b..f5b8a69290 100644 --- a/src/tests/_internal/server/services/test_instances.py +++ b/src/tests/_internal/server/services/test_instances.py @@ -14,19 +14,28 @@ InstanceType, Resources, ) -from dstack._internal.core.models.profiles import Profile +from dstack._internal.core.models.profiles import ( + FleetInstanceSelector, + InstanceHostnameSelector, + InstanceNameSelector, + Profile, +) from dstack._internal.core.models.runs import JobStatus from dstack._internal.server.models import InstanceModel from dstack._internal.server.schemas.runner import TaskListItem, TaskListResponse, TaskStatus from dstack._internal.server.services.runner.client import ShimClient from dstack._internal.server.testing.common import ( + create_export, + create_fleet, create_instance, create_job, create_project, create_repo, create_run, create_user, + get_job_provisioning_data, get_kubernetes_volume_configuration, + get_remote_connection_info, get_volume, get_volume_configuration, get_volume_provisioning_data, @@ -202,6 +211,318 @@ async def test_returns_volume_instances_with_az(self, test_db, session: AsyncSes ) assert res == [aws_instance_2] + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + async def test_filters_by_instance_name(self, test_db, session: AsyncSession): + user = await create_user(session=session) + project = await create_project(session=session, owner=user) + instance0 = await create_instance( + session=session, + project=project, + instance_num=0, + name="my-cluster-0", + ) + instance1 = await create_instance( + session=session, + project=project, + instance_num=1, + name="my-cluster-1", + ) + instances = [instance0, instance1] + res = instances_services.filter_instances( + instances=instances, + profile=Profile(name="test", instances=[InstanceNameSelector(name="my-cluster-1")]), + project=project, + ) + assert res == [instance1] + + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + async def test_filters_by_instance_name_case_insensitive(self, test_db, session: AsyncSession): + user = await create_user(session=session) + project = await create_project(session=session, owner=user) + instance0 = await create_instance( + session=session, + project=project, + name="my-cluster-0", + ) + res = instances_services.filter_instances( + instances=[instance0], + profile=Profile(name="test", instances=[InstanceNameSelector(name="MY-CLUSTER-0")]), + project=project, + ) + assert res == [instance0] + + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + async def test_filters_by_hostname(self, test_db, session: AsyncSession): + user = await create_user(session=session) + project = await create_project(session=session, owner=user) + instance0 = await create_instance( + session=session, + project=project, + name="my-cluster-0", + job_provisioning_data=get_job_provisioning_data(hostname="10.0.0.7"), + ) + instance1 = await create_instance( + session=session, + project=project, + name="my-cluster-1", + job_provisioning_data=get_job_provisioning_data(hostname="10.0.0.8"), + ) + instances = [instance0, instance1] + res = instances_services.filter_instances( + instances=instances, + profile=Profile( + name="test", + instances=[InstanceHostnameSelector(hostname="10.0.0.8")], + ), + project=project, + ) + assert res == [instance1] + + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + async def test_filters_by_internal_ip(self, test_db, session: AsyncSession): + user = await create_user(session=session) + project = await create_project(session=session, owner=user) + instance0 = await create_instance( + session=session, + project=project, + name="my-cluster-0", + job_provisioning_data=get_job_provisioning_data( + hostname="203.0.113.7", internal_ip="10.0.0.7" + ), + ) + instance1 = await create_instance( + session=session, + project=project, + name="my-cluster-1", + job_provisioning_data=get_job_provisioning_data( + hostname="203.0.113.8", internal_ip="10.0.0.8" + ), + ) + res = instances_services.filter_instances( + instances=[instance0, instance1], + profile=Profile( + name="test", + instances=[InstanceHostnameSelector(hostname="10.0.0.8")], + ), + project=project, + ) + assert res == [instance1] + + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + async def test_string_selector_does_not_match_hostname(self, test_db, session: AsyncSession): + user = await create_user(session=session) + project = await create_project(session=session, owner=user) + instance = await create_instance( + session=session, + project=project, + name="my-cluster-0", + job_provisioning_data=get_job_provisioning_data(hostname="10.0.0.8"), + ) + res = instances_services.filter_instances( + instances=[instance], + profile=Profile.parse_obj({"name": "test", "instances": ["10.0.0.8"]}), + project=project, + ) + assert res == [] + + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + async def test_filters_by_ssh_host(self, test_db, session: AsyncSession): + user = await create_user(session=session) + project = await create_project(session=session, owner=user) + instance0 = await create_instance( + session=session, + project=project, + name="my-cluster-0", + remote_connection_info=get_remote_connection_info(host="192.168.1.10"), + ) + instance1 = await create_instance( + session=session, + project=project, + name="my-cluster-1", + remote_connection_info=get_remote_connection_info(host="192.168.1.11"), + ) + instances = [instance0, instance1] + res = instances_services.filter_instances( + instances=instances, + profile=Profile( + name="test", + instances=[InstanceHostnameSelector(hostname="192.168.1.11")], + ), + project=project, + ) + assert res == [instance1] + + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + async def test_filters_by_fleet_and_instance_number(self, test_db, session: AsyncSession): + user = await create_user(session=session) + project = await create_project(session=session, owner=user) + fleet = await create_fleet(session=session, project=project, name="my-fleet") + instance0 = await create_instance( + session=session, + project=project, + fleet=fleet, + instance_num=0, + name="worker-a", + ) + instance1 = await create_instance( + session=session, + project=project, + fleet=fleet, + instance_num=1, + name="worker-b", + ) + res = instances_services.filter_instances( + instances=[instance0, instance1], + profile=Profile( + name="test", + instances=[FleetInstanceSelector(fleet="my-fleet", instance=1)], + ), + project=project, + ) + assert res == [instance1] + + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + async def test_selectors_require_project(self, test_db, session: AsyncSession): + user = await create_user(session=session) + project = await create_project(session=session, owner=user) + instance = await create_instance(session=session, project=project) + with pytest.raises(ValueError, match="project"): + instances_services.filter_instances( + instances=[instance], + profile=Profile(name="test", instances=[InstanceNameSelector(name="instance")]), + ) + + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + @pytest.mark.parametrize( + ("selector", "expected_instance_name"), + [ + ("same-fleet", "local-worker"), + ("exporter-project/same-fleet", "exported-worker"), + ], + ) + async def test_fleet_selector_respects_project_reference( + self, + test_db, + session: AsyncSession, + selector: str, + expected_instance_name: str, + ): + user = await create_user(session=session) + project = await create_project(session=session, owner=user, name="importer-project") + exporter_project = await create_project( + session=session, owner=user, name="exporter-project" + ) + local_fleet = await create_fleet(session=session, project=project, name="same-fleet") + exported_fleet = await create_fleet( + session=session, project=exporter_project, name="same-fleet" + ) + local_instance = await create_instance( + session=session, + project=project, + fleet=local_fleet, + instance_num=1, + name="local-worker", + ) + exported_instance = await create_instance( + session=session, + project=exporter_project, + fleet=exported_fleet, + instance_num=1, + name="exported-worker", + ) + await create_export( + session=session, + exporter_project=exporter_project, + importer_projects=[project], + exported_fleets=[exported_fleet], + ) + + res = instances_services.filter_instances( + instances=[local_instance, exported_instance], + profile=Profile( + name="test", + instances=[FleetInstanceSelector(fleet=selector, instance=1)], + ), + project=project, + ) + + assert [i.name for i in res] == [expected_instance_name] + + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + async def test_no_instances_selector_returns_all(self, test_db, session: AsyncSession): + user = await create_user(session=session) + project = await create_project(session=session, owner=user) + instance0 = await create_instance(session=session, project=project, name="my-cluster-0") + instance1 = await create_instance(session=session, project=project, name="my-cluster-1") + instances = [instance0, instance1] + res = instances_services.filter_instances( + instances=instances, + profile=Profile(name="test", instances=None), + ) + assert res == instances + + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + async def test_filters_by_instance_name_for_multinode(self, test_db, session: AsyncSession): + # Regression: the selector must also be applied on the multinode filter path. + user = await create_user(session=session) + project = await create_project(session=session, owner=user) + instance0 = await create_instance( + session=session, + project=project, + backend=BackendType.AWS, + name="my-fleet-0", + ) + instance1 = await create_instance( + session=session, + project=project, + backend=BackendType.AWS, + name="my-fleet-1", + ) + res = instances_services.filter_instances( + instances=[instance0, instance1], + profile=Profile(name="test", instances=[InstanceNameSelector(name="my-fleet-1")]), + multinode=True, + project=project, + ) + assert res == [instance1] + + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + async def test_filters_by_instance_name_for_shared(self, test_db, session: AsyncSession): + # Regression: the selector must also be applied on the shared-instances filter path. + user = await create_user(session=session) + project = await create_project(session=session, owner=user) + instance0 = await create_instance( + session=session, + project=project, + name="my-fleet-0", + total_blocks=2, + ) + instance1 = await create_instance( + session=session, + project=project, + name="my-fleet-1", + total_blocks=2, + ) + res = instances_services.filter_instances( + instances=[instance0, instance1], + profile=Profile(name="test", instances=[InstanceNameSelector(name="my-fleet-1")]), + shared=True, + project=project, + ) + assert res == [instance1] + @pytest.mark.asyncio @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) async def test_returns_volume_instances_without_region(self, test_db, session: AsyncSession):