Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Replace MCTS with Ray RL Algorithms #95

Merged
merged 40 commits into from
May 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
3a20a7e
rllib base, add json save and tuned model train, add python executabl…
EdenWuyifan Mar 8, 2024
fbc1b9a
fix grammar
EdenWuyifan Mar 8, 2024
959180f
tmp
EdenWuyifan Mar 12, 2024
99a1053
tmp add early stopping
EdenWuyifan Mar 14, 2024
a352107
tmp
EdenWuyifan Mar 27, 2024
46b9945
update json path
EdenWuyifan Mar 29, 2024
661b9e5
fix RlLib for some cases
EdenWuyifan Apr 1, 2024
333a849
update AMLB scripts
EdenWuyifan Apr 1, 2024
f293c85
for test
EdenWuyifan Apr 5, 2024
eacf2bb
tmp
EdenWuyifan Apr 5, 2024
23c7354
tmp
EdenWuyifan Apr 5, 2024
ef1debf
tmp
EdenWuyifan Apr 8, 2024
2617452
tmpppp
EdenWuyifan Apr 8, 2024
a706691
ipynb temp archive
EdenWuyifan Apr 8, 2024
439dbfe
debug
EdenWuyifan Apr 12, 2024
162bd45
update game env and configs, limit results to 20
EdenWuyifan Apr 17, 2024
8eb424c
debug
EdenWuyifan Apr 17, 2024
db202c2
debug
EdenWuyifan Apr 17, 2024
70fccd4
seperate results
EdenWuyifan Apr 17, 2024
64deb84
resample when evaluation
EdenWuyifan Apr 22, 2024
c96aa6d
update random seed to datetime milliseconds
EdenWuyifan Apr 23, 2024
cec8c82
Refactor
roquelopez Apr 24, 2024
fbf5f16
Change module names
roquelopez Apr 24, 2024
982d4e5
Remove old modules
roquelopez Apr 24, 2024
0c7b0e5
Refactor: remove deprecated methods
roquelopez Apr 24, 2024
1d9810e
Remove player param
roquelopez Apr 25, 2024
dafa7b2
add scoring metric as metafeature
EdenWuyifan Apr 25, 2024
aa640e5
Allow user to specify the checkpoints folder
roquelopez Apr 26, 2024
dce07c6
Minor
roquelopez Apr 26, 2024
19e4f8b
Refactor output folder
EdenWuyifan Apr 26, 2024
aebf4e5
add offline training scripts
EdenWuyifan Apr 26, 2024
9d3e490
lint
EdenWuyifan Apr 27, 2024
e44a7b8
add documentations
EdenWuyifan Apr 27, 2024
7e76fcb
Restructure scripts
roquelopez Apr 28, 2024
a5f0750
Fix params
roquelopez Apr 28, 2024
e9c86b4
Minor
roquelopez Apr 29, 2024
a6871be
Fix loading checkpoints
roquelopez Apr 29, 2024
6bab136
Disable verbose param for multiprocessing
roquelopez Apr 30, 2024
852e3b3
make action space varies
EdenWuyifan May 1, 2024
7e7b726
add checkpoint folder
EdenWuyifan May 1, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,6 @@ lightning_logs
.cache
**/smac3_output
/venv

.vscode
**/.vscode
47 changes: 30 additions & 17 deletions alpha_automl/automl_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class BaseAutoML():

def __init__(self, time_bound=15, metric=None, split_strategy='holdout', time_bound_run=5, task=None,
score_sorting='auto', metric_kwargs=None, split_strategy_kwargs=None, output_folder=None,
num_cpus=None, start_mode='auto', verbose=logging.INFO):
checkpoints_folder=None, num_cpus=None, start_mode='auto', verbose=logging.INFO):
"""
Create/instantiate an BaseAutoML object.

Expand All @@ -39,6 +39,8 @@ def __init__(self, time_bound=15, metric=None, split_strategy='holdout', time_bo
:param metric_kwargs: Additional arguments for metric.
:param split_strategy_kwargs: Additional arguments for splitting_strategy.
:param output_folder: Path to the output directory. If it is None, create a temp folder automatically.
:param checkpoints_folder: Path to the directory to load and save the checkpoints. If it is None,
it will use the default checkpoints and save the new checkpoints in output_folder.
:param num_cpus: Number of CPUs to be used.
:param start_mode: The mode to start the multiprocessing library. It could be `auto`, `fork` or `spawn`.
:param verbose: The logs level.
Expand All @@ -60,11 +62,11 @@ def __init__(self, time_bound=15, metric=None, split_strategy='holdout', time_bo
self.X = None
self.y = None
self.leaderboard = None
self.automl_manager = AutoMLManager(self.output_folder, time_bound, time_bound_run, task, num_cpus, verbose)
self._start_method = get_start_method(start_mode)
set_start_method(self._start_method, force=True)
check_input_for_multiprocessing(self._start_method, self.scorer._score_func, 'metric')
check_input_for_multiprocessing(self._start_method, self.splitter, 'split strategy')
self.automl_manager = AutoMLManager(self.output_folder, checkpoints_folder, time_bound, time_bound_run, task, num_cpus, verbose)
#self._start_method = get_start_method(start_mode)
#set_start_method(self._start_method, force=True)
#check_input_for_multiprocessing(self._start_method, self.scorer._score_func, 'metric')
#check_input_for_multiprocessing(self._start_method, self.splitter, 'split strategy')
self.label_encoder = None
self.task_type = task

Expand Down Expand Up @@ -194,7 +196,7 @@ def add_primitives(self, new_primitives):
:param new_primitives: Set of new primitives, tuples of name and object primitive
"""
for primitive_object, primitive_type in new_primitives:
check_input_for_multiprocessing(self._start_method, primitive_object, 'primitive')
#check_input_for_multiprocessing(self._start_method, primitive_object, 'primitive')
primitive_name = f'{primitive_object.__module__}.{primitive_object.__class__.__name__}'
primitive_name = primitive_name.replace('__', '') # Sklearn restriction on estimator names
self.new_primitives[primitive_name] = {'primitive_object': primitive_object,
Expand Down Expand Up @@ -297,7 +299,7 @@ class ClassifierBaseAutoML(BaseAutoML):

def __init__(self, time_bound=15, metric='accuracy_score', split_strategy='holdout', time_bound_run=5, task=None,
score_sorting='auto', metric_kwargs=None, split_strategy_kwargs=None, output_folder=None,
num_cpus=None, start_mode='auto', verbose=logging.INFO):
checkpoints_folder=None, num_cpus=None, start_mode='auto', verbose=logging.INFO):
"""
Create/instantiate an AutoMLClassifier object.

Expand All @@ -312,13 +314,15 @@ def __init__(self, time_bound=15, metric='accuracy_score', split_strategy='holdo
:param metric_kwargs: Additional arguments for metric.
:param split_strategy_kwargs: Additional arguments for splitting_strategy.
:param output_folder: Path to the output directory. If it is None, create a temp folder automatically.
:param checkpoints_folder: Path to the directory to load and save the checkpoints. If it is None,
it will use the default checkpoints and save the new checkpoints in output_folder.
:param num_cpus: Number of CPUs to be used.
:param start_mode: The mode to start the multiprocessing library. It could be `auto`, `fork` or `spawn`.
:param verbose: The logs level.
"""

super().__init__(time_bound, metric, split_strategy, time_bound_run, task, score_sorting, metric_kwargs,
split_strategy_kwargs, output_folder, num_cpus, start_mode, verbose)
split_strategy_kwargs, output_folder, checkpoints_folder, num_cpus, start_mode, verbose)

self.label_encoder = LabelEncoder()

Expand Down Expand Up @@ -354,7 +358,7 @@ class AutoMLClassifier(ClassifierBaseAutoML):

def __init__(self, time_bound=15, metric='accuracy_score', split_strategy='holdout', time_bound_run=5,
score_sorting='auto', metric_kwargs=None, split_strategy_kwargs=None, output_folder=None,
num_cpus=None, start_mode='auto', verbose=logging.INFO):
checkpoints_folder=None, num_cpus=None, start_mode='auto', verbose=logging.INFO):
"""
Create/instantiate an AutoMLClassifier object.

Expand All @@ -368,21 +372,23 @@ def __init__(self, time_bound=15, metric='accuracy_score', split_strategy='holdo
:param metric_kwargs: Additional arguments for metric.
:param split_strategy_kwargs: Additional arguments for splitting_strategy.
:param output_folder: Path to the output directory. If it is None, create a temp folder automatically.
:param checkpoints_folder: Path to the directory to load and save the checkpoints. If it is None,
it will use the default checkpoints and save the new checkpoints in output_folder.
:param num_cpus: Number of CPUs to be used.
:param start_mode: The mode to start the multiprocessing library. It could be `auto`, `fork` or `spawn`.
:param verbose: The logs level.
"""

task = 'CLASSIFICATION'
super().__init__(time_bound, metric, split_strategy, time_bound_run, task, score_sorting, metric_kwargs,
split_strategy_kwargs, output_folder, num_cpus, start_mode, verbose)
split_strategy_kwargs, output_folder, checkpoints_folder, num_cpus, start_mode, verbose)


class AutoMLRegressor(BaseAutoML):

def __init__(self, time_bound=15, metric='mean_absolute_error', split_strategy='holdout', time_bound_run=5,
score_sorting='auto', metric_kwargs=None, split_strategy_kwargs=None, output_folder=None,
num_cpus=None, start_mode='auto', verbose=logging.INFO):
checkpoints_folder=None, num_cpus=None, start_mode='auto', verbose=logging.INFO):
"""
Create/instantiate an AutoMLRegressor object.

Expand All @@ -396,20 +402,23 @@ def __init__(self, time_bound=15, metric='mean_absolute_error', split_strategy='
:param metric_kwargs: Additional arguments for metric.
:param split_strategy_kwargs: Additional arguments for splitting_strategy.
:param output_folder: Path to the output directory. If it is None, create a temp folder automatically.
:param checkpoints_folder: Path to the directory to load and save the checkpoints. If it is None,
it will use the default checkpoints and save the new checkpoints in output_folder.
:param num_cpus: Number of CPUs to be used.
:param start_mode: The mode to start the multiprocessing library. It could be `auto`, `fork` or `spawn`.
:param verbose: The logs level.
"""

task = 'REGRESSION'
super().__init__(time_bound, metric, split_strategy, time_bound_run, task, score_sorting, metric_kwargs,
split_strategy_kwargs, output_folder, num_cpus, start_mode, verbose)
split_strategy_kwargs, output_folder, checkpoints_folder, num_cpus, start_mode, verbose)


class AutoMLTimeSeries(BaseAutoML):
def __init__(self, time_bound=15, metric='mean_squared_error', split_strategy='timeseries', time_bound_run=5,
score_sorting='auto', metric_kwargs=None, split_strategy_kwargs=None, output_folder=None,
num_cpus=None, start_mode='auto', verbose=logging.INFO, date_column=None, target_column=None):
checkpoints_folder=None, num_cpus=None, start_mode='auto', verbose=logging.INFO, date_column=None,
target_column=None):
"""
Create/instantiate an AutoMLTimeSeries object.

Expand All @@ -423,6 +432,8 @@ def __init__(self, time_bound=15, metric='mean_squared_error', split_strategy='t
:param metric_kwargs: Additional arguments for metric.
:param split_strategy_kwargs: Additional arguments for TimeSeriesSplit, E.g. n_splits and test_size(int).
:param output_folder: Path to the output directory. If it is None, create a temp folder automatically.
:param checkpoints_folder: Path to the directory to load and save the checkpoints. If it is None,
it will use the default checkpoints and save the new checkpoints in output_folder.
:param num_cpus: Number of CPUs to be used.
:param start_mode: The mode to start the multiprocessing library. It could be `auto`, `fork` or `spawn`.
:param verbose: The logs level.
Expand All @@ -433,7 +444,7 @@ def __init__(self, time_bound=15, metric='mean_squared_error', split_strategy='t
self.target_column = target_column

super().__init__(time_bound, metric, split_strategy, time_bound_run, task, score_sorting, metric_kwargs,
split_strategy_kwargs, output_folder, num_cpus, start_mode, verbose)
split_strategy_kwargs, output_folder, checkpoints_folder, num_cpus, start_mode, verbose)

def _column_parser(self, X):
cols = list(X.columns.values)
Expand All @@ -452,7 +463,7 @@ class AutoMLSemiSupervisedClassifier(ClassifierBaseAutoML):

def __init__(self, time_bound=15, metric='accuracy_score', split_strategy='holdout', time_bound_run=5,
score_sorting='auto', metric_kwargs=None, split_strategy_kwargs=None, output_folder=None,
num_cpus=None, start_mode='auto', verbose=logging.INFO):
checkpoints_folder=None, num_cpus=None, start_mode='auto', verbose=logging.INFO):
"""
Create/instantiate an AutoMLSemiSupervisedClassifier object.

Expand All @@ -467,14 +478,16 @@ def __init__(self, time_bound=15, metric='accuracy_score', split_strategy='holdo
:param split_strategy_kwargs: Additional arguments for splitting_strategy. In SemiSupervised case, `n_splits`
and `test_size`(test proportion from 0 to 1) can be pass to the splitter.
:param output_folder: Path to the output directory. If it is None, create a temp folder automatically.
:param checkpoints_folder: Path to the directory to load and save the checkpoints. If it is None,
it will use the default checkpoints and save the new checkpoints in output_folder.
:param num_cpus: Number of CPUs to be used.
:param start_mode: The mode to start the multiprocessing library. It could be `auto`, `fork` or `spawn`.
:param verbose: The logs level.
"""

task = 'SEMISUPERVISED'
super().__init__(time_bound, metric, split_strategy, time_bound_run, task, score_sorting, metric_kwargs,
split_strategy_kwargs, output_folder, num_cpus, start_mode, verbose)
split_strategy_kwargs, output_folder, checkpoints_folder, num_cpus, start_mode, verbose)

if split_strategy_kwargs is None:
split_strategy_kwargs = {'test_size': 0.25}
Expand Down
106 changes: 27 additions & 79 deletions alpha_automl/automl_manager.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
import time
import multiprocessing
import queue as Q
from alpha_automl.pipeline import Pipeline
from alpha_automl.data_profiler import profile_data
from alpha_automl.scorer import make_splitter, score_pipeline
from alpha_automl.utils import sample_dataset, is_equal_splitting
Expand All @@ -22,8 +22,9 @@

class AutoMLManager():

def __init__(self, output_folder, time_bound, time_bound_run, task, num_cpus, verbose):
def __init__(self, output_folder, checkpoints_folder, time_bound, time_bound_run, task, num_cpus, verbose):
self.output_folder = output_folder
self.checkpoints_folder = checkpoints_folder
self.time_bound = time_bound * 60
self.time_bound_run = time_bound_run * 60
self.task = task
Expand Down Expand Up @@ -60,86 +61,33 @@ def _search_pipelines(self, automl_hyperparams):
if not is_sample and is_equal_splitting(internal_splitting_strategy, self.splitting_strategy):
need_rescoring = False

queue = multiprocessing.Queue()
search_process = multiprocessing.Process(target=search_pipelines_proc,
args=(X, y, self.scoring, internal_splitting_strategy, self.task,
automl_hyperparams, metadata, self.output_folder, self.verbose,
queue
)
)

search_process.start()
self.running_processes += 1
num_processes = self.num_cpus - 2 # Exclude the main process and search process
scoring_pool = multiprocessing.Pool(max(1, num_processes))
pipelines_to_score = []
scoring_results = []

while True:
pipelines = search_pipelines_proc(X, y, self.scoring, internal_splitting_strategy, self.task,
self.time_bound, automl_hyperparams, metadata, self.output_folder,
self.checkpoints_folder)

found_pipelines = 0

pipeline_threshold = 20
X, y, _ = sample_dataset(self.X, self.y, SAMPLE_SIZE, self.task)
while pipelines and found_pipelines < pipeline_threshold:
pipeline = pipelines.pop()
try:
result = queue.get(timeout=10)
except Q.Empty:
logger.debug('Reached timeout getting new pipelines')
result = None

if result == 'DONE':
search_process.terminate()
search_process.join(10)
scoring_pool.terminate()
scoring_pool.join()
logger.debug(f'Found {self.found_pipelines} pipelines')
logger.debug('Search done')
break

elif result is not None:
pipeline = result
logger.debug('Found new pipeline')
yield {'pipeline': pipeline, 'message': 'FOUND'}

if need_rescoring:
pipelines_to_score.append(pipeline)
else:
logger.debug(f'Pipeline scored successfully, score={pipeline.get_score()}')
self.found_pipelines += 1
yield {'pipeline': pipeline, 'message': 'SCORED'}

if len(pipelines_to_score) > 0:
if self.running_processes < MAX_RUNNING_PROCESSES:
pipeline = pipelines_to_score.pop(0).get_pipeline()
scoring_result = scoring_pool.apply_async(
score_pipeline,
args=(pipeline, self.X, self.y, self.scoring, self.splitting_strategy, self.task, self.verbose)
)
scoring_results.append(scoring_result)
self.running_processes += 1

tmp_scoring_results = []
for scoring_result in scoring_results:
if scoring_result.ready():
self.running_processes -= 1
pipeline = scoring_result.get()
if pipeline is not None:
logger.debug(f'Pipeline scored successfully, score={pipeline.get_score()}')
self.found_pipelines += 1
yield {'pipeline': pipeline, 'message': 'SCORED'}
else:
tmp_scoring_results.append(scoring_result)

scoring_results = tmp_scoring_results

if time.time() > search_start_time + self.time_bound:
logger.debug('Reached search timeout')
search_process.terminate()
search_process.join(10)
scoring_pool.terminate()
scoring_pool.join()
logger.debug(f'Found {self.found_pipelines} pipelines')
break
alphaautoml_pipeline = score_pipeline(pipeline, X, y, self.scoring, self.splitting_strategy, self.task)

if alphaautoml_pipeline is not None:
score = alphaautoml_pipeline.get_score()
logger.debug(f'Pipeline scored successfully, score={score}')
found_pipelines += 1
yield {'pipeline': alphaautoml_pipeline, 'message': 'SCORED'}
except:
logger.debug(f'Pipeline scoring error!')
continue

logger.debug(f'Found {found_pipelines} pipelines')
logger.debug('Search done')

def check_automl_hyperparams(self, automl_hyperparams):
if 'use_automatic_grammar' not in automl_hyperparams:
automl_hyperparams['use_automatic_grammar'] = USE_AUTOMATIC_GRAMMAR

def check_automl_hyperparams(self, automl_hyperparams):
if 'prioritize_primitives' not in automl_hyperparams:
automl_hyperparams['prioritize_primitives'] = PRIORITIZE_PRIMITIVES

Expand Down
5 changes: 3 additions & 2 deletions alpha_automl/grammar_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@ def create_task_grammar(global_grammar, task):

for production in global_grammar.productions():
for new_production in new_productions:
if production.lhs() in new_production.rhs() and production not in new_productions:
new_productions.append(production)
if production not in new_productions:
if str(production.lhs()) != "S" and "_TASK" not in str(production.lhs()):
new_productions.append(production)

task_grammar = CFG(start_token, new_productions)
logger.debug(f'Task grammar: {task_grammar}')
Expand Down
Loading
Loading