Skip to content

Commit

Permalink
Merge pull request #95 from VIDA-NYU/dqn_implementation
Browse files Browse the repository at this point in the history
WIP: Replace MCTS with Ray RL Algorithms
  • Loading branch information
EdenWuyifan authored May 1, 2024
2 parents d77a9fe + 7e7b726 commit b6d685f
Show file tree
Hide file tree
Showing 39 changed files with 746 additions and 2,032 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,6 @@ lightning_logs
.cache
**/smac3_output
/venv

.vscode
**/.vscode
47 changes: 30 additions & 17 deletions alpha_automl/automl_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class BaseAutoML():

def __init__(self, time_bound=15, metric=None, split_strategy='holdout', time_bound_run=5, task=None,
score_sorting='auto', metric_kwargs=None, split_strategy_kwargs=None, output_folder=None,
num_cpus=None, start_mode='auto', verbose=logging.INFO):
checkpoints_folder=None, num_cpus=None, start_mode='auto', verbose=logging.INFO):
"""
Create/instantiate an BaseAutoML object.
Expand All @@ -39,6 +39,8 @@ def __init__(self, time_bound=15, metric=None, split_strategy='holdout', time_bo
:param metric_kwargs: Additional arguments for metric.
:param split_strategy_kwargs: Additional arguments for splitting_strategy.
:param output_folder: Path to the output directory. If it is None, create a temp folder automatically.
:param checkpoints_folder: Path to the directory to load and save the checkpoints. If it is None,
it will use the default checkpoints and save the new checkpoints in output_folder.
:param num_cpus: Number of CPUs to be used.
:param start_mode: The mode to start the multiprocessing library. It could be `auto`, `fork` or `spawn`.
:param verbose: The logs level.
Expand All @@ -60,11 +62,11 @@ def __init__(self, time_bound=15, metric=None, split_strategy='holdout', time_bo
self.X = None
self.y = None
self.leaderboard = None
self.automl_manager = AutoMLManager(self.output_folder, time_bound, time_bound_run, task, num_cpus, verbose)
self._start_method = get_start_method(start_mode)
set_start_method(self._start_method, force=True)
check_input_for_multiprocessing(self._start_method, self.scorer._score_func, 'metric')
check_input_for_multiprocessing(self._start_method, self.splitter, 'split strategy')
self.automl_manager = AutoMLManager(self.output_folder, checkpoints_folder, time_bound, time_bound_run, task, num_cpus, verbose)
#self._start_method = get_start_method(start_mode)
#set_start_method(self._start_method, force=True)
#check_input_for_multiprocessing(self._start_method, self.scorer._score_func, 'metric')
#check_input_for_multiprocessing(self._start_method, self.splitter, 'split strategy')
self.label_encoder = None
self.task_type = task

Expand Down Expand Up @@ -194,7 +196,7 @@ def add_primitives(self, new_primitives):
:param new_primitives: Set of new primitives, tuples of name and object primitive
"""
for primitive_object, primitive_type in new_primitives:
check_input_for_multiprocessing(self._start_method, primitive_object, 'primitive')
#check_input_for_multiprocessing(self._start_method, primitive_object, 'primitive')
primitive_name = f'{primitive_object.__module__}.{primitive_object.__class__.__name__}'
primitive_name = primitive_name.replace('__', '') # Sklearn restriction on estimator names
self.new_primitives[primitive_name] = {'primitive_object': primitive_object,
Expand Down Expand Up @@ -297,7 +299,7 @@ class ClassifierBaseAutoML(BaseAutoML):

def __init__(self, time_bound=15, metric='accuracy_score', split_strategy='holdout', time_bound_run=5, task=None,
score_sorting='auto', metric_kwargs=None, split_strategy_kwargs=None, output_folder=None,
num_cpus=None, start_mode='auto', verbose=logging.INFO):
checkpoints_folder=None, num_cpus=None, start_mode='auto', verbose=logging.INFO):
"""
Create/instantiate an AutoMLClassifier object.
Expand All @@ -312,13 +314,15 @@ def __init__(self, time_bound=15, metric='accuracy_score', split_strategy='holdo
:param metric_kwargs: Additional arguments for metric.
:param split_strategy_kwargs: Additional arguments for splitting_strategy.
:param output_folder: Path to the output directory. If it is None, create a temp folder automatically.
:param checkpoints_folder: Path to the directory to load and save the checkpoints. If it is None,
it will use the default checkpoints and save the new checkpoints in output_folder.
:param num_cpus: Number of CPUs to be used.
:param start_mode: The mode to start the multiprocessing library. It could be `auto`, `fork` or `spawn`.
:param verbose: The logs level.
"""

super().__init__(time_bound, metric, split_strategy, time_bound_run, task, score_sorting, metric_kwargs,
split_strategy_kwargs, output_folder, num_cpus, start_mode, verbose)
split_strategy_kwargs, output_folder, checkpoints_folder, num_cpus, start_mode, verbose)

self.label_encoder = LabelEncoder()

Expand Down Expand Up @@ -354,7 +358,7 @@ class AutoMLClassifier(ClassifierBaseAutoML):

def __init__(self, time_bound=15, metric='accuracy_score', split_strategy='holdout', time_bound_run=5,
score_sorting='auto', metric_kwargs=None, split_strategy_kwargs=None, output_folder=None,
num_cpus=None, start_mode='auto', verbose=logging.INFO):
checkpoints_folder=None, num_cpus=None, start_mode='auto', verbose=logging.INFO):
"""
Create/instantiate an AutoMLClassifier object.
Expand All @@ -368,21 +372,23 @@ def __init__(self, time_bound=15, metric='accuracy_score', split_strategy='holdo
:param metric_kwargs: Additional arguments for metric.
:param split_strategy_kwargs: Additional arguments for splitting_strategy.
:param output_folder: Path to the output directory. If it is None, create a temp folder automatically.
:param checkpoints_folder: Path to the directory to load and save the checkpoints. If it is None,
it will use the default checkpoints and save the new checkpoints in output_folder.
:param num_cpus: Number of CPUs to be used.
:param start_mode: The mode to start the multiprocessing library. It could be `auto`, `fork` or `spawn`.
:param verbose: The logs level.
"""

task = 'CLASSIFICATION'
super().__init__(time_bound, metric, split_strategy, time_bound_run, task, score_sorting, metric_kwargs,
split_strategy_kwargs, output_folder, num_cpus, start_mode, verbose)
split_strategy_kwargs, output_folder, checkpoints_folder, num_cpus, start_mode, verbose)


class AutoMLRegressor(BaseAutoML):

def __init__(self, time_bound=15, metric='mean_absolute_error', split_strategy='holdout', time_bound_run=5,
score_sorting='auto', metric_kwargs=None, split_strategy_kwargs=None, output_folder=None,
num_cpus=None, start_mode='auto', verbose=logging.INFO):
checkpoints_folder=None, num_cpus=None, start_mode='auto', verbose=logging.INFO):
"""
Create/instantiate an AutoMLRegressor object.
Expand All @@ -396,20 +402,23 @@ def __init__(self, time_bound=15, metric='mean_absolute_error', split_strategy='
:param metric_kwargs: Additional arguments for metric.
:param split_strategy_kwargs: Additional arguments for splitting_strategy.
:param output_folder: Path to the output directory. If it is None, create a temp folder automatically.
:param checkpoints_folder: Path to the directory to load and save the checkpoints. If it is None,
it will use the default checkpoints and save the new checkpoints in output_folder.
:param num_cpus: Number of CPUs to be used.
:param start_mode: The mode to start the multiprocessing library. It could be `auto`, `fork` or `spawn`.
:param verbose: The logs level.
"""

task = 'REGRESSION'
super().__init__(time_bound, metric, split_strategy, time_bound_run, task, score_sorting, metric_kwargs,
split_strategy_kwargs, output_folder, num_cpus, start_mode, verbose)
split_strategy_kwargs, output_folder, checkpoints_folder, num_cpus, start_mode, verbose)


class AutoMLTimeSeries(BaseAutoML):
def __init__(self, time_bound=15, metric='mean_squared_error', split_strategy='timeseries', time_bound_run=5,
score_sorting='auto', metric_kwargs=None, split_strategy_kwargs=None, output_folder=None,
num_cpus=None, start_mode='auto', verbose=logging.INFO, date_column=None, target_column=None):
checkpoints_folder=None, num_cpus=None, start_mode='auto', verbose=logging.INFO, date_column=None,
target_column=None):
"""
Create/instantiate an AutoMLTimeSeries object.
Expand All @@ -423,6 +432,8 @@ def __init__(self, time_bound=15, metric='mean_squared_error', split_strategy='t
:param metric_kwargs: Additional arguments for metric.
:param split_strategy_kwargs: Additional arguments for TimeSeriesSplit, E.g. n_splits and test_size(int).
:param output_folder: Path to the output directory. If it is None, create a temp folder automatically.
:param checkpoints_folder: Path to the directory to load and save the checkpoints. If it is None,
it will use the default checkpoints and save the new checkpoints in output_folder.
:param num_cpus: Number of CPUs to be used.
:param start_mode: The mode to start the multiprocessing library. It could be `auto`, `fork` or `spawn`.
:param verbose: The logs level.
Expand All @@ -433,7 +444,7 @@ def __init__(self, time_bound=15, metric='mean_squared_error', split_strategy='t
self.target_column = target_column

super().__init__(time_bound, metric, split_strategy, time_bound_run, task, score_sorting, metric_kwargs,
split_strategy_kwargs, output_folder, num_cpus, start_mode, verbose)
split_strategy_kwargs, output_folder, checkpoints_folder, num_cpus, start_mode, verbose)

def _column_parser(self, X):
cols = list(X.columns.values)
Expand All @@ -452,7 +463,7 @@ class AutoMLSemiSupervisedClassifier(ClassifierBaseAutoML):

def __init__(self, time_bound=15, metric='accuracy_score', split_strategy='holdout', time_bound_run=5,
score_sorting='auto', metric_kwargs=None, split_strategy_kwargs=None, output_folder=None,
num_cpus=None, start_mode='auto', verbose=logging.INFO):
checkpoints_folder=None, num_cpus=None, start_mode='auto', verbose=logging.INFO):
"""
Create/instantiate an AutoMLSemiSupervisedClassifier object.
Expand All @@ -467,14 +478,16 @@ def __init__(self, time_bound=15, metric='accuracy_score', split_strategy='holdo
:param split_strategy_kwargs: Additional arguments for splitting_strategy. In SemiSupervised case, `n_splits`
and `test_size`(test proportion from 0 to 1) can be pass to the splitter.
:param output_folder: Path to the output directory. If it is None, create a temp folder automatically.
:param checkpoints_folder: Path to the directory to load and save the checkpoints. If it is None,
it will use the default checkpoints and save the new checkpoints in output_folder.
:param num_cpus: Number of CPUs to be used.
:param start_mode: The mode to start the multiprocessing library. It could be `auto`, `fork` or `spawn`.
:param verbose: The logs level.
"""

task = 'SEMISUPERVISED'
super().__init__(time_bound, metric, split_strategy, time_bound_run, task, score_sorting, metric_kwargs,
split_strategy_kwargs, output_folder, num_cpus, start_mode, verbose)
split_strategy_kwargs, output_folder, checkpoints_folder, num_cpus, start_mode, verbose)

if split_strategy_kwargs is None:
split_strategy_kwargs = {'test_size': 0.25}
Expand Down
106 changes: 27 additions & 79 deletions alpha_automl/automl_manager.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
import time
import multiprocessing
import queue as Q
from alpha_automl.pipeline import Pipeline
from alpha_automl.data_profiler import profile_data
from alpha_automl.scorer import make_splitter, score_pipeline
from alpha_automl.utils import sample_dataset, is_equal_splitting
Expand All @@ -22,8 +22,9 @@

class AutoMLManager():

def __init__(self, output_folder, time_bound, time_bound_run, task, num_cpus, verbose):
def __init__(self, output_folder, checkpoints_folder, time_bound, time_bound_run, task, num_cpus, verbose):
self.output_folder = output_folder
self.checkpoints_folder = checkpoints_folder
self.time_bound = time_bound * 60
self.time_bound_run = time_bound_run * 60
self.task = task
Expand Down Expand Up @@ -60,86 +61,33 @@ def _search_pipelines(self, automl_hyperparams):
if not is_sample and is_equal_splitting(internal_splitting_strategy, self.splitting_strategy):
need_rescoring = False

queue = multiprocessing.Queue()
search_process = multiprocessing.Process(target=search_pipelines_proc,
args=(X, y, self.scoring, internal_splitting_strategy, self.task,
automl_hyperparams, metadata, self.output_folder, self.verbose,
queue
)
)

search_process.start()
self.running_processes += 1
num_processes = self.num_cpus - 2 # Exclude the main process and search process
scoring_pool = multiprocessing.Pool(max(1, num_processes))
pipelines_to_score = []
scoring_results = []

while True:
pipelines = search_pipelines_proc(X, y, self.scoring, internal_splitting_strategy, self.task,
self.time_bound, automl_hyperparams, metadata, self.output_folder,
self.checkpoints_folder)

found_pipelines = 0

pipeline_threshold = 20
X, y, _ = sample_dataset(self.X, self.y, SAMPLE_SIZE, self.task)
while pipelines and found_pipelines < pipeline_threshold:
pipeline = pipelines.pop()
try:
result = queue.get(timeout=10)
except Q.Empty:
logger.debug('Reached timeout getting new pipelines')
result = None

if result == 'DONE':
search_process.terminate()
search_process.join(10)
scoring_pool.terminate()
scoring_pool.join()
logger.debug(f'Found {self.found_pipelines} pipelines')
logger.debug('Search done')
break

elif result is not None:
pipeline = result
logger.debug('Found new pipeline')
yield {'pipeline': pipeline, 'message': 'FOUND'}

if need_rescoring:
pipelines_to_score.append(pipeline)
else:
logger.debug(f'Pipeline scored successfully, score={pipeline.get_score()}')
self.found_pipelines += 1
yield {'pipeline': pipeline, 'message': 'SCORED'}

if len(pipelines_to_score) > 0:
if self.running_processes < MAX_RUNNING_PROCESSES:
pipeline = pipelines_to_score.pop(0).get_pipeline()
scoring_result = scoring_pool.apply_async(
score_pipeline,
args=(pipeline, self.X, self.y, self.scoring, self.splitting_strategy, self.task, self.verbose)
)
scoring_results.append(scoring_result)
self.running_processes += 1

tmp_scoring_results = []
for scoring_result in scoring_results:
if scoring_result.ready():
self.running_processes -= 1
pipeline = scoring_result.get()
if pipeline is not None:
logger.debug(f'Pipeline scored successfully, score={pipeline.get_score()}')
self.found_pipelines += 1
yield {'pipeline': pipeline, 'message': 'SCORED'}
else:
tmp_scoring_results.append(scoring_result)

scoring_results = tmp_scoring_results

if time.time() > search_start_time + self.time_bound:
logger.debug('Reached search timeout')
search_process.terminate()
search_process.join(10)
scoring_pool.terminate()
scoring_pool.join()
logger.debug(f'Found {self.found_pipelines} pipelines')
break
alphaautoml_pipeline = score_pipeline(pipeline, X, y, self.scoring, self.splitting_strategy, self.task)

if alphaautoml_pipeline is not None:
score = alphaautoml_pipeline.get_score()
logger.debug(f'Pipeline scored successfully, score={score}')
found_pipelines += 1
yield {'pipeline': alphaautoml_pipeline, 'message': 'SCORED'}
except:
logger.debug(f'Pipeline scoring error!')
continue

logger.debug(f'Found {found_pipelines} pipelines')
logger.debug('Search done')

def check_automl_hyperparams(self, automl_hyperparams):
if 'use_automatic_grammar' not in automl_hyperparams:
automl_hyperparams['use_automatic_grammar'] = USE_AUTOMATIC_GRAMMAR

def check_automl_hyperparams(self, automl_hyperparams):
if 'prioritize_primitives' not in automl_hyperparams:
automl_hyperparams['prioritize_primitives'] = PRIORITIZE_PRIMITIVES

Expand Down
5 changes: 3 additions & 2 deletions alpha_automl/grammar_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@ def create_task_grammar(global_grammar, task):

for production in global_grammar.productions():
for new_production in new_productions:
if production.lhs() in new_production.rhs() and production not in new_productions:
new_productions.append(production)
if production not in new_productions:
if str(production.lhs()) != "S" and "_TASK" not in str(production.lhs()):
new_productions.append(production)

task_grammar = CFG(start_token, new_productions)
logger.debug(f'Task grammar: {task_grammar}')
Expand Down
Loading

0 comments on commit b6d685f

Please sign in to comment.