Skip to content
This repository has been archived by the owner on Sep 18, 2024. It is now read-only.

[FEAT]: resume waiting/running, dedup on tuner side (TPE-only) #4931

Merged
merged 2 commits into from
Aug 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion nni/algorithms/hpo/tpe_tuner.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,21 @@ def import_data(self, data): # for resuming experiment
loss = -loss
for key, value in param.items():
self._history[key].append(Record(value, loss))
_logger.info(f'Replayed {len(data)} trials')
self.dedup.add_history(param)
_logger.info(f'Replayed {len(data)} FINISHED trials')

def import_customized_data(self, data): # for dedup customized / resumed
if isinstance(data, str):
data = nni.load(data)

for trial in data:
# {'parameter_id': 0, 'parameter_source': 'resumed', 'parameters': {'batch_size': 128, ...}
if isinstance(trial, str):
trial = nni.load(trial)
param = format_parameters(trial['parameters'], self.space)
self._running_params[trial['parameter_id']] = param
self.dedup.add_history(param)
_logger.info(f'Replayed {len(data)} RUNING/WAITING trials')

def suggest(args, rng, space, history):
params = {}
Expand Down
6 changes: 6 additions & 0 deletions nni/common/hpo_utils/dedup.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@ def _not_dup(self, formatted_parameters: FormattedParameters) -> bool:
self._history.add(params_str)
return True

def add_history(self, formatted_parameters: FormattedParameters) -> None:
params = deformat_parameters(formatted_parameters, self._space)
params_str = typing.cast(str, nni.dump(params, sort_keys=True))
if params_str not in self._history:
self._history.add(params_str)

def _spec_never_dup(spec: ParameterSpec) -> bool:
if spec.is_nested():
return False # "not chosen" duplicates with "not chosen"
Expand Down
13 changes: 10 additions & 3 deletions nni/runtime/msg_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,14 @@ def handle_import_data(self, data):

def handle_add_customized_trial(self, data):
# data: parameters
id_ = _create_parameter_id()
_customized_parameter_ids.add(id_)
if not isinstance(data, list):
data = [data]

for _ in data:
id_ = _create_parameter_id()
_customized_parameter_ids.add(id_)

self.tuner.import_customized_data(data)

def handle_report_metric_data(self, data):
"""
Expand Down Expand Up @@ -187,7 +193,8 @@ def _handle_final_metric_data(self, data):
self.tuner.receive_trial_result(id_, _trial_params[id_], value, customized=customized,
trial_job_id=data.get('trial_job_id'))
else:
_logger.warning('Find unknown job parameter id %s, maybe something goes wrong.', _trial_params[id_])
_logger.warning('Find unknown job parameter id %s, maybe something goes wrong.', id_)
_logger.warning('_trial_params %s', _trial_params)

def _handle_intermediate_metric_data(self, data):
"""Call assessor to process intermediate results
Expand Down
8 changes: 8 additions & 0 deletions nni/tuner.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,14 @@ def import_data(self, data: list[TrialRecord]) -> None:
# data: a list of dictionarys, each of which has at least two keys, 'parameter' and 'value'
pass

def import_customized_data(self, data: list[TrialRecord]) -> None:
"""
Internal API under revising, not recommended for end users.
"""
# Import resume data for avoiding duplications
# data: a list of dictionarys, each of which has at least two keys, 'parameter_id' and 'parameters'
pass

def _on_exit(self) -> None:
pass

Expand Down
2 changes: 1 addition & 1 deletion test/ut/sdk/test_builtin_tuners.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ def import_data_test(self, tuner_factory, stype="choice_str", support_middle=Tru
search_space = {
"choice_str": {
"_type": "choice",
"_value": ["cat", "dog", "elephant", "cow", "sheep", "panda"]
"_value": ["cat", "dog", "elephant", "cow", "sheep", "panda", "tiger"]
}
}
elif stype == "choice_num":
Expand Down
2 changes: 1 addition & 1 deletion ts/nni_manager/common/datastore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import { ExperimentProfile, TrialJobStatistics } from './manager';
import { TrialJobDetail, TrialJobStatus } from './trainingService';

type TrialJobEvent = TrialJobStatus | 'USER_TO_CANCEL' | 'ADD_CUSTOMIZED' | 'ADD_HYPERPARAMETER' | 'IMPORT_DATA';
type TrialJobEvent = TrialJobStatus | 'USER_TO_CANCEL' | 'ADD_CUSTOMIZED' | 'ADD_HYPERPARAMETER' | 'IMPORT_DATA' |'ADD_RESUMED';
type MetricType = 'PERIODICAL' | 'FINAL' | 'CUSTOM' | 'REQUEST_PARAMETER';

interface ExperimentProfileRecord {
Expand Down
1 change: 1 addition & 0 deletions ts/nni_manager/common/trainingService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ interface TrialJobApplicationForm {
readonly sequenceId: number;
readonly hyperParameters: HyperParameters;
readonly placementConstraint?: PlacementConstraint;
id?: string;
}

interface TrialCommandContent {
Expand Down
55 changes: 49 additions & 6 deletions ts/nni_manager/core/nnimanager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import {
import { delay, getCheckpointDir, getExperimentRootDir, getLogDir, getMsgDispatcherCommand, mkDirP, getTunerProc, getLogLevel, isAlive, killPid } from '../common/utils';
import {
INITIALIZE, INITIALIZED, KILL_TRIAL_JOB, NEW_TRIAL_JOB, NO_MORE_TRIAL_JOBS, PING,
REPORT_METRIC_DATA, REQUEST_TRIAL_JOBS, SEND_TRIAL_JOB_PARAMETER, TERMINATE, TRIAL_END, UPDATE_SEARCH_SPACE, IMPORT_DATA
REPORT_METRIC_DATA, REQUEST_TRIAL_JOBS, SEND_TRIAL_JOB_PARAMETER, TERMINATE, TRIAL_END, UPDATE_SEARCH_SPACE, IMPORT_DATA, ADD_CUSTOMIZED_TRIAL_JOB
} from './commands';
import { createDispatcherInterface, IpcInterface } from './ipcInterface';

Expand All @@ -43,6 +43,7 @@ class NNIManager implements Manager {
private waitingTrials: TrialJobApplicationForm[];
private trialJobs: Map<string, TrialJobDetail>;
private trialDataForTuner: string;
private trialDataForResume: string;
private readonly: boolean;
private config!: ExperimentConfig;

Expand All @@ -55,6 +56,7 @@ class NNIManager implements Manager {
this.waitingTrials = [];
this.trialJobs = new Map<string, TrialJobDetail>();
this.trialDataForTuner = '';
this.trialDataForResume = '';
this.readonly = false;

this.log = getLogger('NNIManager');
Expand Down Expand Up @@ -118,6 +120,45 @@ class NNIManager implements Manager {
return this.dataStore.exportTrialHpConfigs();
}

public addRecoveredTrialJob(allTrialJobs: Array<TrialJobInfo>): void {
const jobs: Array<TrialJobInfo> = allTrialJobs.filter((job: TrialJobInfo) => job.status === 'WAITING' || job.status === 'RUNNING');
const trialData: any[] = [];
let maxSequeceId = 0;
for (const job of jobs) {
if (job.sequenceId === undefined || job.hyperParameters === undefined) {
this.log.warning('The trial to be recovered missing sequenceId and/or hyperParameters', job);
continue;
}
const params: string = job.hyperParameters[0];
const sequenceId: number = job.sequenceId;
maxSequeceId = Math.max(maxSequeceId, sequenceId);

const hyperParams = JSON.parse(params);
const packedParameter = {
parameter_id: hyperParams['parameter_id'], // eslint-disable-line @typescript-eslint/camelcase
parameter_source: 'resumed', // eslint-disable-line @typescript-eslint/camelcase
parameters: hyperParams['parameters'],
parameter_index: hyperParams['parameter_index'], // eslint-disable-line @typescript-eslint/camelcase
}
const form: TrialJobApplicationForm = {
id: job.trialJobId,
sequenceId: sequenceId,
hyperParameters: {
value: JSON.stringify(packedParameter),
index: 0
},
};

this.waitingTrials.push(form);
trialData.push(packedParameter);
this.dataStore.storeTrialJobEvent('ADD_RESUMED', job.trialJobId, '');
}
this.trialDataForResume = JSON.stringify(trialData);

// next sequenceId
this.experimentProfile.nextSequenceId = maxSequeceId + 1;
}

public addCustomizedTrialJob(hyperParams: string): Promise<number> {
if (this.readonly) {
return Promise.reject(new Error('Error: can not add customized trial job in readonly mode!'));
Expand Down Expand Up @@ -220,11 +261,7 @@ class NNIManager implements Manager {

// Resume currSubmittedTrialNum
this.currSubmittedTrialNum = allTrialJobs.length;

// Check the final status for WAITING and RUNNING jobs
await Promise.all(allTrialJobs
.filter((job: TrialJobInfo) => job.status === 'WAITING' || job.status === 'RUNNING')
.map((job: TrialJobInfo) => this.dataStore.storeTrialJobEvent('FAILED', job.trialJobId)));
this.addRecoveredTrialJob(allTrialJobs);

// Collect generated trials and imported trials
const finishedTrialData: string = await this.exportData();
Expand Down Expand Up @@ -807,6 +844,12 @@ class NNIManager implements Manager {
}
this.dispatcher.sendCommand(IMPORT_DATA, this.trialDataForTuner);
}
if (this.trialDataForResume.length > 0 ) {
if (this.dispatcher === undefined) {
throw new Error('Dispatcher error: tuner has not been setup');
}
this.dispatcher.sendCommand(ADD_CUSTOMIZED_TRIAL_JOB, this.trialDataForResume);
}
this.requestTrialJobs(this.experimentProfile.params.trialConcurrency);
break;
}
Expand Down
2 changes: 2 additions & 0 deletions ts/nni_manager/test/mock/datastore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,8 @@ class MockedDataStore implements DataStore {
return 'USER_CANCELED';
case 'ADD_CUSTOMIZED':
return 'WAITING';
case 'ADD_RESUMED':
return 'WAITING';
}
return <TrialJobStatus>event;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ class AdlTrainingService extends KubernetesTrainingService implements Kubernetes
this.kubernetesRestServerPort = restServer.clusterRestServerPort;
}

const trialJobId: string = uniqueString(5);
const trialJobId: string = form.id === undefined ? uniqueString(5) : form.id;
const adlJobName: string = `nni-exp-${this.experimentId}-trial-${trialJobId}`.toLowerCase();
const initStatus: TrialJobStatus = 'WAITING';
const codeDir = this.adlTrialConfig.codeDir;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple
await this.copyExpCodeDirPromise;
}

const trialJobId: string = uniqueString(5);
const trialJobId: string = form.id === undefined ? uniqueString(5) : form.id;
// Set trial's NFS working folder
const trialWorkingFolder: string = path.join(this.CONTAINER_MOUNT_PATH, 'nni', getExperimentId(), trialJobId);
const trialLocalTempFolder: string = path.join(getExperimentRootDir(), 'trials', trialJobId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class KubeflowTrainingService extends KubernetesTrainingService implements Kuber
await this.copyExpCodeDirPromise;
}

const trialJobId: string = uniqueString(5);
const trialJobId: string = form.id === undefined ? uniqueString(5) : form.id;
const trialWorkingFolder: string = path.join(this.CONTAINER_MOUNT_PATH, 'nni', getExperimentId(), trialJobId);
const kubeflowJobName: string = `nni-exp-${this.experimentId}-trial-${trialJobId}`.toLowerCase();
const trialLocalTempFolder: string = path.join(getExperimentRootDir(), 'trials', trialJobId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ class LocalTrainingService implements TrainingService {
}

public submitTrialJob(form: TrialJobApplicationForm): Promise<TrialJobDetail> {
const trialJobId: string = uniqueString(5);
const trialJobId: string = form.id === undefined ? uniqueString(5) : form.id;
const trialJobDetail: LocalTrialJobDetail = new LocalTrialJobDetail(
trialJobId,
'WAITING',
Expand Down
2 changes: 1 addition & 1 deletion ts/nni_manager/training_service/pai/paiTrainingService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ class PAITrainingService implements TrainingService {
public async submitTrialJob(form: TrialJobApplicationForm): Promise<TrialJobDetail> {
this.log.info('submitTrialJob: form:', form);

const trialJobId: string = uniqueString(5);
const trialJobId: string = form.id === undefined ? uniqueString(5) : form.id;
//TODO: use HDFS working folder instead
const trialWorkingFolder: string = path.join(this.expRootDir, 'trials', trialJobId);
const paiJobName: string = `nni_exp_${this.experimentId}_trial_${trialJobId}`;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ class RemoteMachineTrainingService implements TrainingService {
*/
public async submitTrialJob(form: TrialJobApplicationForm): Promise<TrialJobDetail> {
// Generate trial job id(random)
const trialJobId: string = uniqueString(5);
const trialJobId: string = form.id === undefined ? uniqueString(5) : form.id;

const trialJobDetail: RemoteMachineTrialJobDetail = new RemoteMachineTrialJobDetail(
trialJobId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ class TrialDispatcher implements TrainingService {
}

public async submitTrialJob(form: TrialJobApplicationForm): Promise<TrialDetail> {
const trialId: string = uniqueString(5);
const trialId: string = form.id === undefined ? uniqueString(5) : form.id;

const trialJobDetail: TrialDetail = new TrialDetail(trialId, "WAITING", Date.now(), "", form);

Expand Down