Skip to content

Commit

Permalink
feat: spot instance/job settings supports. (#33)
Browse files Browse the repository at this point in the history
* bump to develop version: 0.4.9.dev0

* chore: upgrade trainingservice pop sdk

* feat: trainingjob supports use spot instance

* feat: model recipe support use_spot_instance

* feat: support advance settings for jobs

* hack: add `resource_type` field for CreateTrainingJobRequest

* fix type annotation for `resource_type`

* fix: model recipe missing input/output channels
  • Loading branch information
pitt-liang authored Jul 11, 2024
1 parent a561533 commit 2644f04
Show file tree
Hide file tree
Showing 12 changed files with 10,952 additions and 8,804 deletions.
18 changes: 18 additions & 0 deletions pai/api/training_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@
CreateTrainingJobRequest,
CreateTrainingJobRequestComputeResource,
CreateTrainingJobRequestComputeResourceInstanceSpec,
CreateTrainingJobRequestComputeResourceSpotSpec,
CreateTrainingJobRequestExperimentConfig,
CreateTrainingJobRequestHyperParameters,
CreateTrainingJobRequestInputChannels,
CreateTrainingJobRequestLabels,
CreateTrainingJobRequestOutputChannels,
CreateTrainingJobRequestScheduler,
CreateTrainingJobRequestSettings,
CreateTrainingJobRequestUserVpc,
CreateTrainingJobResponseBody,
GetTrainingJobRequest,
Expand Down Expand Up @@ -86,8 +88,10 @@ def create(
instance_type,
instance_count,
job_name,
spot_spec: Optional[Dict[str, Any]] = None,
instance_spec: Optional[Dict[str, str]] = None,
resource_id: Optional[str] = None,
resource_type: Optional[str] = None,
hyperparameters: Optional[Dict[str, Any]] = None,
input_channels: Optional[List[Dict[str, Any]]] = None,
output_channels: Optional[List[Dict[str, Any]]] = None,
Expand All @@ -102,6 +106,7 @@ def create(
algorithm_spec: Optional[Dict[str, Any]] = None,
user_vpc_config: Optional[Dict[str, Any]] = None,
experiment_config: Optional[Dict[str, Any]] = None,
settings: Optional[Dict[str, Any]] = None,
) -> str:
"""Create a TrainingJob."""
if algorithm_spec and (
Expand All @@ -126,9 +131,16 @@ def create(
for ch in output_channels
]
if instance_type:
spot_spec = (
CreateTrainingJobRequestComputeResourceSpotSpec().from_map(spot_spec)
if spot_spec
else None
)
compute_resource = CreateTrainingJobRequestComputeResource(
ecs_count=instance_count,
ecs_spec=instance_type,
use_spot_instance=bool(spot_spec),
spot_spec=spot_spec,
)
elif instance_spec:
compute_resource = CreateTrainingJobRequestComputeResource(
Expand Down Expand Up @@ -169,6 +181,7 @@ def create(
compute_resource=compute_resource,
hyper_parameters=hyper_parameters,
input_channels=input_channels,
resource_type=resource_type,
environments=environments,
python_requirements=requirements,
labels=labels,
Expand All @@ -181,6 +194,11 @@ def create(
experiment_config=CreateTrainingJobRequestExperimentConfig().from_map(
experiment_config
),
settings=(
CreateTrainingJobRequestSettings().from_map(settings)
if settings
else None
),
)

resp: CreateTrainingJobResponseBody = self._do_request(
Expand Down
16 changes: 16 additions & 0 deletions pai/estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
DEFAULT_OUTPUT_MODEL_CHANNEL_NAME,
DEFAULT_TENSORBOARD_CHANNEL_NAME,
ExperimentConfig,
ResourceType,
SpotSpec,
UserVpcConfig,
)
from .model import InferenceSpec, Model, ResourceConfig
Expand Down Expand Up @@ -187,11 +189,14 @@ def __init__(
environments: Optional[Dict[str, str]] = None,
requirements: Optional[List[str]] = None,
instance_type: Optional[str] = None,
spot_spec: Optional[SpotSpec] = None,
instance_spec: Optional[Dict] = None,
resource_id: Optional[Dict] = None,
resource_type: Optional[Union[str, ResourceType]] = None,
instance_count: Optional[int] = None,
user_vpc_config: Optional[UserVpcConfig] = None,
experiment_config: Optional[ExperimentConfig] = None,
settings: Optional[Dict[str, Any]] = None,
labels: Optional[Dict[str, str]] = None,
session: Optional[Session] = None,
):
Expand Down Expand Up @@ -252,12 +257,18 @@ def __init__(
'package' or 'package==version'. This is similar to the contents of a requirements.txt file used
in Python projects. If requirements.txt is provided in user code directory, requirements
will override the conflict dependencies directly.
resource_type (str, optional): The resource type used to run the training job.
By default, general computing resource is used. If the resource_type is
'Lingjun', Lingjun computing resource is used.
instance_type (str, optional): The machine instance type used to run the
training job. To view the supported machine instance types, please refer
to the document:
https://help.aliyun.com/document_detail/171758.htm#section-55y-4tq-84y.
If the instance_type is "local", the training job is executed locally
using docker.
spot_spec (:class:`pai.job.SpotSpec`, optional): The specification of the spot
instance used to run the training job. If provided, the training job will
use the spot instance to run the training job.
instance_count (int): The number of machines used to run the training job.
user_vpc_config (:class:`pai.estimator.UserVpcConfig`, optional): The VPC
configuration used to enable the training job instance to connect to the
Expand All @@ -270,6 +281,8 @@ def __init__(
training job and the experiment. If provided, the training job will belong
to the specified experiment, in which case the training job will use
artifact_uri of experiment as default output path. Default to None.
settings (dict, optional): A dictionary that represents the additional settings
for job, such as AIMaster configurations.
labels (Dict[str, str], optional): A dictionary that maps label names to
their values. This optional field allows you to provide a set of labels
that will be applied to the training job.
Expand All @@ -287,11 +300,14 @@ def __init__(
instance_type=instance_type,
instance_count=instance_count,
resource_id=resource_id,
resource_type=resource_type,
spot_spec=spot_spec,
instance_spec=instance_spec,
user_vpc_config=user_vpc_config,
max_run_time=max_run_time,
environments=environments,
requirements=requirements,
settings=settings,
labels=labels,
)

Expand Down
6 changes: 6 additions & 0 deletions pai/job/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
InstanceSpec,
ModelRecipeSpec,
OssLocation,
ResourceType,
SpotSpec,
SpotStrategy,
TrainingJob,
TrainingJobStatus,
UriInput,
Expand All @@ -45,4 +48,7 @@
"ExperimentConfig",
"InstanceSpec",
"UriInput",
"SpotSpec",
"ResourceType",
"SpotStrategy",
]
56 changes: 55 additions & 1 deletion pai/job/_training_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import time
import typing
from concurrent.futures import ThreadPoolExecutor
from enum import Enum
from typing import Any, Dict, List, Optional, Union

from pydantic import BaseModel, ConfigDict, Field
Expand Down Expand Up @@ -55,6 +56,19 @@ def as_oss_dir_uri(uri: str):
DEFAULT_TENSORBOARD_CHANNEL_NAME = "tensorboard"


class SpotStrategy(str, Enum):
SpotWithPriceLimit = "SpotWithPriceLimit"
SpotAsPriceGo = "SpotAsPriceGo"

def __repr__(self):
return self.value


class ResourceType(str, Enum):
Lingjun = "Lingjun"
General = "General"


class BaseAPIModel(BaseModel):

model_config = ConfigDict(
Expand Down Expand Up @@ -275,11 +289,14 @@ class AlgorithmSpec(BaseAPIModel):
)
hyperparameter_definitions: List[HyperParameterDefinition] = Field(
default_factory=list,
alias="HyperParameter",
alias="HyperParameters",
description="Hyperparameter definitions.",
)
job_type: str = Field(default="PyTorchJob")
code_dir: Optional[CodeDir] = Field(None, description="Source code location.")
customization: Optional[Dict[str, Any]] = Field(
None, description="Whether the algorithm supports customize code."
)


class ModelRecipeSpec(BaseAPIModel):
Expand All @@ -300,6 +317,19 @@ class ModelRecipeSpec(BaseAPIModel):
requirements: Optional[List[str]] = None


class SpotSpec(BaseAPIModel):
spot_strategy: SpotStrategy = Field(
...,
description="Spot instance strategy, support 'SpotWithPriceLimit', 'SpotAsPriceGo'",
)
spot_discount_limit: Optional[float] = Field(
None,
description="Spot instance discount limit, maximum 2 decimal places, "
"required when spot_strategy is 'SpotWithPriceLimit'."
"For example, 0.5 means 50% off the original price.",
)


class TrainingJob(BaseAPIModel):
"""TrainingJob represents a training job in the PAI service."""

Expand Down Expand Up @@ -542,23 +572,29 @@ def __init__(
instance_spec: Optional[Dict] = None,
instance_count: Optional[int] = None,
resource_id: Optional[Dict] = None,
resource_type: Optional[Union[str, ResourceType]] = None,
spot_spec: Optional[SpotSpec] = None,
environments: Optional[Dict] = None,
requirements: Optional[List[str]] = None,
labels: Optional[Dict[str, str]] = None,
settings: Optional[Dict[str, Any]] = None,
):
self.session = get_default_session()
self._training_jobs = []
self.base_job_name = base_job_name or type(self).__name__.lower()
self.output_path = output_path
self.user_vpc_config = user_vpc_config
self.spot_spec = spot_spec
self.experiment_config = experiment_config
self.max_run_time = max_run_time
self.instance_type = instance_type
self.instance_spec = instance_spec
self.instance_count = instance_count or 1
self.resource_id = resource_id
self.resource_type = ResourceType(resource_type) if resource_type else None
self.environments = environments
self.requirements = requirements
self.settings = settings
self.labels = labels

def wait(self, interval: int = 5, show_logs: bool = True, all_jobs: bool = False):
Expand Down Expand Up @@ -704,6 +740,7 @@ def build_outputs(

return [item.model_dump() for item in res]

# TODO: get arguments, such as VPCConfig, instance_type etc, from self instance.
def _submit(
self,
job_name: str,
Expand All @@ -728,6 +765,20 @@ def _submit(
show_logs: bool = False,
):
session = get_default_session()

if not self.resource_type or self.resource_type == ResourceType.General:
resource_type = None
else:
resource_type = self.resource_type.value

if self.spot_spec:
spot_spec = {
"SpotStrategy": self.spot_spec.spot_strategy.value,
}
if self.spot_spec.spot_discount_limit:
spot_spec["SpotDiscountLimit"] = self.spot_spec.spot_discount_limit
else:
spot_spec = None
training_job_id = session.training_job_api.create(
instance_count=instance_count,
instance_spec=instance_spec.model_dump() if instance_spec else None,
Expand All @@ -738,9 +789,11 @@ def _submit(
if experiment_config and isinstance(experiment_config, ExperimentConfig)
else experiment_config
),
spot_spec=spot_spec,
algorithm_version=algorithm_version,
instance_type=instance_type,
resource_id=resource_id,
resource_type=resource_type,
job_name=job_name,
hyperparameters=hyperparameters,
max_running_in_seconds=max_run_time,
Expand All @@ -751,6 +804,7 @@ def _submit(
user_vpc_config=user_vpc_config,
labels=labels,
environments=environments,
settings=self.settings,
)
training_job = TrainingJob.get(training_job_id)
self._training_jobs.append(training_job)
Expand Down
Loading

0 comments on commit 2644f04

Please sign in to comment.