From e9def63145b6fd648681e2f60a49fa4f01f648ee Mon Sep 17 00:00:00 2001 From: "weidan.kong" Date: Tue, 14 Jun 2022 06:37:47 +0800 Subject: [PATCH 1/2] [FEAT]: resume waiting/running, dedup on tuner side (TPE-only) --- nni/algorithms/hpo/tpe_tuner.py | 16 +++++- nni/common/hpo_utils/dedup.py | 6 ++ nni/runtime/msg_dispatcher.py | 13 ++++- nni/tuner.py | 8 +++ ts/nni_manager/common/datastore.ts | 2 +- ts/nni_manager/common/trainingService.ts | 1 + ts/nni_manager/core/nnimanager.ts | 55 +++++++++++++++++-- ts/nni_manager/test/mock/datastore.ts | 2 + .../kubernetes/adl/adlTrainingService.ts | 2 +- .../frameworkcontrollerTrainingService.ts | 2 +- .../kubeflow/kubeflowTrainingService.ts | 2 +- .../local/localTrainingService.ts | 2 +- .../pai/paiTrainingService.ts | 2 +- .../remoteMachineTrainingService.ts | 2 +- .../reusable/trialDispatcher.ts | 2 +- 15 files changed, 99 insertions(+), 18 deletions(-) diff --git a/nni/algorithms/hpo/tpe_tuner.py b/nni/algorithms/hpo/tpe_tuner.py index cf51a74182..d7162e7d76 100644 --- a/nni/algorithms/hpo/tpe_tuner.py +++ b/nni/algorithms/hpo/tpe_tuner.py @@ -215,7 +215,21 @@ def import_data(self, data): # for resuming experiment loss = -loss for key, value in param.items(): self._history[key].append(Record(value, loss)) - _logger.info(f'Replayed {len(data)} trials') + self.dedup.add_history(param) + _logger.info(f'Replayed {len(data)} FINISHED trials') + + def import_customized_data(self, data): # for dedup customized / resumed + if isinstance(data, str): + data = nni.load(data) + + for trial in data: + # {'parameter_id': 0, 'parameter_source': 'resumed', 'parameters': {'batch_size': 128, ...} + if isinstance(trial, str): + trial = nni.load(trial) + param = format_parameters(trial['parameters'], self.space) + self._running_params[trial['parameter_id']] = param + self.dedup.add_history(param) + _logger.info(f'Replayed {len(data)} RUNING/WAITING trials') def suggest(args, rng, space, history): params = {} diff --git a/nni/common/hpo_utils/dedup.py b/nni/common/hpo_utils/dedup.py index cc07725141..e9d144fbc9 100644 --- a/nni/common/hpo_utils/dedup.py +++ b/nni/common/hpo_utils/dedup.py @@ -79,6 +79,12 @@ def _not_dup(self, formatted_parameters: FormattedParameters) -> bool: self._history.add(params_str) return True + def add_history(self, formatted_parameters: FormattedParameters) -> None: + params = deformat_parameters(formatted_parameters, self._space) + params_str = typing.cast(str, nni.dump(params, sort_keys=True)) + if params_str not in self._history: + self._history.add(params_str) + def _spec_never_dup(spec: ParameterSpec) -> bool: if spec.is_nested(): return False # "not chosen" duplicates with "not chosen" diff --git a/nni/runtime/msg_dispatcher.py b/nni/runtime/msg_dispatcher.py index df59a48a58..b337d879db 100644 --- a/nni/runtime/msg_dispatcher.py +++ b/nni/runtime/msg_dispatcher.py @@ -121,8 +121,14 @@ def handle_import_data(self, data): def handle_add_customized_trial(self, data): # data: parameters - id_ = _create_parameter_id() - _customized_parameter_ids.add(id_) + if not isinstance(data, list): + data = [data] + + for _ in data: + id_ = _create_parameter_id() + _customized_parameter_ids.add(id_) + + self.tuner.import_customized_data(data) def handle_report_metric_data(self, data): """ @@ -187,7 +193,8 @@ def _handle_final_metric_data(self, data): self.tuner.receive_trial_result(id_, _trial_params[id_], value, customized=customized, trial_job_id=data.get('trial_job_id')) else: - _logger.warning('Find unknown job parameter id %s, maybe something goes wrong.', _trial_params[id_]) + _logger.warning('Find unknown job parameter id %s, maybe something goes wrong.', id_) + _logger.warning('_trial_params %s', _trial_params) def _handle_intermediate_metric_data(self, data): """Call assessor to process intermediate results diff --git a/nni/tuner.py b/nni/tuner.py index 87b168db65..c94e68043a 100644 --- a/nni/tuner.py +++ b/nni/tuner.py @@ -219,6 +219,14 @@ def import_data(self, data: list[TrialRecord]) -> None: # data: a list of dictionarys, each of which has at least two keys, 'parameter' and 'value' pass + def import_customized_data(self, data: list[TrialRecord]) -> None: + """ + Internal API under revising, not recommended for end users. + """ + # Import resume data for avoiding duplications + # data: a list of dictionarys, each of which has at least two keys, 'parameter_id' and 'parameters' + pass + def _on_exit(self) -> None: pass diff --git a/ts/nni_manager/common/datastore.ts b/ts/nni_manager/common/datastore.ts index eecd277699..754ec4d70b 100644 --- a/ts/nni_manager/common/datastore.ts +++ b/ts/nni_manager/common/datastore.ts @@ -4,7 +4,7 @@ import { ExperimentProfile, TrialJobStatistics } from './manager'; import { TrialJobDetail, TrialJobStatus } from './trainingService'; -type TrialJobEvent = TrialJobStatus | 'USER_TO_CANCEL' | 'ADD_CUSTOMIZED' | 'ADD_HYPERPARAMETER' | 'IMPORT_DATA'; +type TrialJobEvent = TrialJobStatus | 'USER_TO_CANCEL' | 'ADD_CUSTOMIZED' | 'ADD_HYPERPARAMETER' | 'IMPORT_DATA' |'ADD_RESUMED'; type MetricType = 'PERIODICAL' | 'FINAL' | 'CUSTOM' | 'REQUEST_PARAMETER'; interface ExperimentProfileRecord { diff --git a/ts/nni_manager/common/trainingService.ts b/ts/nni_manager/common/trainingService.ts index 057a797932..b13035f63e 100644 --- a/ts/nni_manager/common/trainingService.ts +++ b/ts/nni_manager/common/trainingService.ts @@ -34,6 +34,7 @@ interface TrialJobApplicationForm { readonly sequenceId: number; readonly hyperParameters: HyperParameters; readonly placementConstraint?: PlacementConstraint; + id?: string; } interface TrialCommandContent { diff --git a/ts/nni_manager/core/nnimanager.ts b/ts/nni_manager/core/nnimanager.ts index 0cd19b01ac..1cd76fdd68 100644 --- a/ts/nni_manager/core/nnimanager.ts +++ b/ts/nni_manager/core/nnimanager.ts @@ -23,7 +23,7 @@ import { import { delay, getCheckpointDir, getExperimentRootDir, getLogDir, getMsgDispatcherCommand, mkDirP, getTunerProc, getLogLevel, isAlive, killPid } from '../common/utils'; import { INITIALIZE, INITIALIZED, KILL_TRIAL_JOB, NEW_TRIAL_JOB, NO_MORE_TRIAL_JOBS, PING, - REPORT_METRIC_DATA, REQUEST_TRIAL_JOBS, SEND_TRIAL_JOB_PARAMETER, TERMINATE, TRIAL_END, UPDATE_SEARCH_SPACE, IMPORT_DATA + REPORT_METRIC_DATA, REQUEST_TRIAL_JOBS, SEND_TRIAL_JOB_PARAMETER, TERMINATE, TRIAL_END, UPDATE_SEARCH_SPACE, IMPORT_DATA, ADD_CUSTOMIZED_TRIAL_JOB } from './commands'; import { createDispatcherInterface, IpcInterface } from './ipcInterface'; @@ -43,6 +43,7 @@ class NNIManager implements Manager { private waitingTrials: TrialJobApplicationForm[]; private trialJobs: Map; private trialDataForTuner: string; + private trialDataForResume: string; private readonly: boolean; private config!: ExperimentConfig; @@ -55,6 +56,7 @@ class NNIManager implements Manager { this.waitingTrials = []; this.trialJobs = new Map(); this.trialDataForTuner = ''; + this.trialDataForResume = ''; this.readonly = false; this.log = getLogger('NNIManager'); @@ -118,6 +120,45 @@ class NNIManager implements Manager { return this.dataStore.exportTrialHpConfigs(); } + public addRecoveredTrialJob(allTrialJobs: Array): void { + const jobs: Array = allTrialJobs.filter((job: TrialJobInfo) => job.status === 'WAITING' || job.status === 'RUNNING'); + const trialData: any[] = []; + let maxSequeceId = 0; + for (const job of jobs) { + if (job.sequenceId === undefined || job.hyperParameters === undefined) { + this.log.warning('The trial to be recovered missing sequenceId and/or hyperParameters', job); + continue; + } + const params: string = job.hyperParameters[0]; + const sequenceId: number = job.sequenceId; + maxSequeceId = Math.max(maxSequeceId, sequenceId); + + const hyperParams = JSON.parse(params); + const packedParameter = { + parameter_id: hyperParams['parameter_id'], // eslint-disable-line @typescript-eslint/camelcase + parameter_source: 'resumed', // eslint-disable-line @typescript-eslint/camelcase + parameters: hyperParams['parameters'], + parameter_index: hyperParams['parameter_index'], // eslint-disable-line @typescript-eslint/camelcase + } + const form: TrialJobApplicationForm = { + id: job.trialJobId, + sequenceId: sequenceId, + hyperParameters: { + value: JSON.stringify(packedParameter), + index: 0 + }, + }; + + this.waitingTrials.push(form); + trialData.push(packedParameter); + this.dataStore.storeTrialJobEvent('ADD_RESUMED', job.trialJobId, ''); + } + this.trialDataForResume = JSON.stringify(trialData); + + // next sequenceId + this.experimentProfile.nextSequenceId = maxSequeceId + 1; + } + public addCustomizedTrialJob(hyperParams: string): Promise { if (this.readonly) { return Promise.reject(new Error('Error: can not add customized trial job in readonly mode!')); @@ -220,11 +261,7 @@ class NNIManager implements Manager { // Resume currSubmittedTrialNum this.currSubmittedTrialNum = allTrialJobs.length; - - // Check the final status for WAITING and RUNNING jobs - await Promise.all(allTrialJobs - .filter((job: TrialJobInfo) => job.status === 'WAITING' || job.status === 'RUNNING') - .map((job: TrialJobInfo) => this.dataStore.storeTrialJobEvent('FAILED', job.trialJobId))); + this.addRecoveredTrialJob(allTrialJobs); // Collect generated trials and imported trials const finishedTrialData: string = await this.exportData(); @@ -807,6 +844,12 @@ class NNIManager implements Manager { } this.dispatcher.sendCommand(IMPORT_DATA, this.trialDataForTuner); } + if (this.trialDataForResume.length > 0 ) { + if (this.dispatcher === undefined) { + throw new Error('Dispatcher error: tuner has not been setup'); + } + this.dispatcher.sendCommand(ADD_CUSTOMIZED_TRIAL_JOB, this.trialDataForResume); + } this.requestTrialJobs(this.experimentProfile.params.trialConcurrency); break; } diff --git a/ts/nni_manager/test/mock/datastore.ts b/ts/nni_manager/test/mock/datastore.ts index 1bea68d92d..cba78d1c9a 100644 --- a/ts/nni_manager/test/mock/datastore.ts +++ b/ts/nni_manager/test/mock/datastore.ts @@ -229,6 +229,8 @@ class MockedDataStore implements DataStore { return 'USER_CANCELED'; case 'ADD_CUSTOMIZED': return 'WAITING'; + case 'ADD_RESUMED': + return 'WAITING'; } return event; } diff --git a/ts/nni_manager/training_service/kubernetes/adl/adlTrainingService.ts b/ts/nni_manager/training_service/kubernetes/adl/adlTrainingService.ts index 396bbb36dc..44eb56327c 100644 --- a/ts/nni_manager/training_service/kubernetes/adl/adlTrainingService.ts +++ b/ts/nni_manager/training_service/kubernetes/adl/adlTrainingService.ts @@ -116,7 +116,7 @@ class AdlTrainingService extends KubernetesTrainingService implements Kubernetes this.kubernetesRestServerPort = restServer.clusterRestServerPort; } - const trialJobId: string = uniqueString(5); + const trialJobId: string = form.id === undefined ? uniqueString(5) : form.id; const adlJobName: string = `nni-exp-${this.experimentId}-trial-${trialJobId}`.toLowerCase(); const initStatus: TrialJobStatus = 'WAITING'; const codeDir = this.adlTrialConfig.codeDir; diff --git a/ts/nni_manager/training_service/kubernetes/frameworkcontroller/frameworkcontrollerTrainingService.ts b/ts/nni_manager/training_service/kubernetes/frameworkcontroller/frameworkcontrollerTrainingService.ts index 0acb5a0101..19961e9e3e 100644 --- a/ts/nni_manager/training_service/kubernetes/frameworkcontroller/frameworkcontrollerTrainingService.ts +++ b/ts/nni_manager/training_service/kubernetes/frameworkcontroller/frameworkcontrollerTrainingService.ts @@ -131,7 +131,7 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple await this.copyExpCodeDirPromise; } - const trialJobId: string = uniqueString(5); + const trialJobId: string = form.id === undefined ? uniqueString(5) : form.id; // Set trial's NFS working folder const trialWorkingFolder: string = path.join(this.CONTAINER_MOUNT_PATH, 'nni', getExperimentId(), trialJobId); const trialLocalTempFolder: string = path.join(getExperimentRootDir(), 'trials', trialJobId); diff --git a/ts/nni_manager/training_service/kubernetes/kubeflow/kubeflowTrainingService.ts b/ts/nni_manager/training_service/kubernetes/kubeflow/kubeflowTrainingService.ts index bdcac8645f..7051b88ca2 100644 --- a/ts/nni_manager/training_service/kubernetes/kubeflow/kubeflowTrainingService.ts +++ b/ts/nni_manager/training_service/kubernetes/kubeflow/kubeflowTrainingService.ts @@ -78,7 +78,7 @@ class KubeflowTrainingService extends KubernetesTrainingService implements Kuber await this.copyExpCodeDirPromise; } - const trialJobId: string = uniqueString(5); + const trialJobId: string = form.id === undefined ? uniqueString(5) : form.id; const trialWorkingFolder: string = path.join(this.CONTAINER_MOUNT_PATH, 'nni', getExperimentId(), trialJobId); const kubeflowJobName: string = `nni-exp-${this.experimentId}-trial-${trialJobId}`.toLowerCase(); const trialLocalTempFolder: string = path.join(getExperimentRootDir(), 'trials', trialJobId); diff --git a/ts/nni_manager/training_service/local/localTrainingService.ts b/ts/nni_manager/training_service/local/localTrainingService.ts index 0cd9da9247..4e78cb3fe2 100644 --- a/ts/nni_manager/training_service/local/localTrainingService.ts +++ b/ts/nni_manager/training_service/local/localTrainingService.ts @@ -193,7 +193,7 @@ class LocalTrainingService implements TrainingService { } public submitTrialJob(form: TrialJobApplicationForm): Promise { - const trialJobId: string = uniqueString(5); + const trialJobId: string = form.id === undefined ? uniqueString(5) : form.id; const trialJobDetail: LocalTrialJobDetail = new LocalTrialJobDetail( trialJobId, 'WAITING', diff --git a/ts/nni_manager/training_service/pai/paiTrainingService.ts b/ts/nni_manager/training_service/pai/paiTrainingService.ts index 3b0fee940a..ab0dc93e92 100644 --- a/ts/nni_manager/training_service/pai/paiTrainingService.ts +++ b/ts/nni_manager/training_service/pai/paiTrainingService.ts @@ -248,7 +248,7 @@ class PAITrainingService implements TrainingService { public async submitTrialJob(form: TrialJobApplicationForm): Promise { this.log.info('submitTrialJob: form:', form); - const trialJobId: string = uniqueString(5); + const trialJobId: string = form.id === undefined ? uniqueString(5) : form.id; //TODO: use HDFS working folder instead const trialWorkingFolder: string = path.join(this.expRootDir, 'trials', trialJobId); const paiJobName: string = `nni_exp_${this.experimentId}_trial_${trialJobId}`; diff --git a/ts/nni_manager/training_service/remote_machine/remoteMachineTrainingService.ts b/ts/nni_manager/training_service/remote_machine/remoteMachineTrainingService.ts index 599f1c54ac..51f9b4ac0a 100644 --- a/ts/nni_manager/training_service/remote_machine/remoteMachineTrainingService.ts +++ b/ts/nni_manager/training_service/remote_machine/remoteMachineTrainingService.ts @@ -226,7 +226,7 @@ class RemoteMachineTrainingService implements TrainingService { */ public async submitTrialJob(form: TrialJobApplicationForm): Promise { // Generate trial job id(random) - const trialJobId: string = uniqueString(5); + const trialJobId: string = form.id === undefined ? uniqueString(5) : form.id; const trialJobDetail: RemoteMachineTrialJobDetail = new RemoteMachineTrialJobDetail( trialJobId, diff --git a/ts/nni_manager/training_service/reusable/trialDispatcher.ts b/ts/nni_manager/training_service/reusable/trialDispatcher.ts index 13064435ff..65ce50a163 100644 --- a/ts/nni_manager/training_service/reusable/trialDispatcher.ts +++ b/ts/nni_manager/training_service/reusable/trialDispatcher.ts @@ -160,7 +160,7 @@ class TrialDispatcher implements TrainingService { } public async submitTrialJob(form: TrialJobApplicationForm): Promise { - const trialId: string = uniqueString(5); + const trialId: string = form.id === undefined ? uniqueString(5) : form.id; const trialJobDetail: TrialDetail = new TrialDetail(trialId, "WAITING", Date.now(), "", form); From d928664983b347938ef6c1b6c77daa330a5eb334 Mon Sep 17 00:00:00 2001 From: "weidan.kong" Date: Wed, 15 Jun 2022 01:40:40 +0800 Subject: [PATCH 2/2] [UT]: TPE history fix caused issue - add one more choice --- test/ut/sdk/test_builtin_tuners.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/ut/sdk/test_builtin_tuners.py b/test/ut/sdk/test_builtin_tuners.py index 65f85a0c2d..5187bfd588 100644 --- a/test/ut/sdk/test_builtin_tuners.py +++ b/test/ut/sdk/test_builtin_tuners.py @@ -272,7 +272,7 @@ def import_data_test(self, tuner_factory, stype="choice_str", support_middle=Tru search_space = { "choice_str": { "_type": "choice", - "_value": ["cat", "dog", "elephant", "cow", "sheep", "panda"] + "_value": ["cat", "dog", "elephant", "cow", "sheep", "panda", "tiger"] } } elif stype == "choice_num":