diff --git a/llm4ad/method/meoh/meoh.py b/llm4ad/method/meoh/meoh.py index 4da7fd76..b8ede830 100644 --- a/llm4ad/method/meoh/meoh.py +++ b/llm4ad/method/meoh/meoh.py @@ -21,7 +21,6 @@ # # For inquiries regarding commercial use or licensing, please contact # http://www.llm4ad.com/contact.html -# -------------------------------------------------------------------------- from __future__ import annotations @@ -30,6 +29,7 @@ import time import traceback from threading import Thread +from typing import Optional, Literal import numpy as np from .population import Population @@ -47,9 +47,9 @@ def __init__(self, llm: LLM, evaluation: Evaluation, profiler: ProfilerBase = None, - max_generations: int | None = 10, - max_sample_nums: int | None = 100, - pop_size: int = 20, + max_generations: Optional[int] = 10, + max_sample_nums: Optional[int] = 100, + pop_size: Optional[int] = 5, selection_num=5, use_e2_operator: bool = True, use_m1_operator: bool = True, @@ -59,9 +59,9 @@ def __init__(self, num_objs: int = 2, *, resume_mode: bool = False, - initial_sample_num: int | None = None, + initial_sample_nums_max: int = 50, debug_mode: bool = False, - multi_thread_or_process_eval: str = 'thread', + multi_thread_or_process_eval: Literal['thread', 'process'] = 'thread', **kwargs): """ Args: @@ -82,7 +82,8 @@ def __init__(self, setting this parameter to 'process' will faster than 'thread'. However, I do not sure if this happens on all platform so I set the default to 'thread'. Please note that there is one case that cannot utilize multi-core CPU: if you set 'safe_evaluate' argument in 'evaluator' to 'False', and you set this argument to 'thread'. - **kwargs : some args pass to 'llm4ad.base.SecureEvaluator'. Such as 'fork_proc'. + initial_sample_nums_max : maximum samples restriction during initialization. + **kwargs : some args pass to 'llm4ad.base.SecureEvaluator'. Such as 'fork_proc'. """ self._template_program_str = evaluation.template_program self._task_description_str = evaluation.task_description @@ -94,11 +95,14 @@ def __init__(self, self._use_e2_operator = use_e2_operator self._use_m1_operator = use_m1_operator self._use_m2_operator = use_m2_operator + + # samplers and evaluators self._num_samplers = num_samplers self._num_evaluators = num_evaluators self._resume_mode = resume_mode - self._initial_sample_num = initial_sample_num + self._initial_sample_nums_max = initial_sample_nums_max self._debug_mode = debug_mode + llm.debug_mode = debug_mode self._multi_thread_or_process_eval = multi_thread_or_process_eval # function to be evolved @@ -106,9 +110,11 @@ def __init__(self, self._function_to_evolve_name: str = self._function_to_evolve.name self._template_program: Program = TextFunctionProgramConverter.text_to_program(self._template_program_str) + # adjust population size + self._adjust_pop_size() + # population, sampler, and evaluator self._population = Population(pop_size=self._pop_size) - llm.debug_mode = debug_mode self._sampler = MEoHSampler(llm, self._template_program_str) self._evaluator = SecureEvaluator(evaluation, debug_mode=debug_mode, **kwargs) self._profiler = profiler @@ -116,7 +122,13 @@ def __init__(self, self._profiler.record_parameters(llm, evaluation, self) # ZL: Necessary # statistics - self._tot_sample_nums = 0 if initial_sample_num is None else initial_sample_num + self._tot_sample_nums = 0 + + # reset _initial_sample_nums_max + self._initial_sample_nums_max = max( + self._initial_sample_nums_max, + 2 * self._pop_size + ) # multi-thread executor for evaluation assert multi_thread_or_process_eval in ['thread', 'process'] @@ -129,120 +141,122 @@ def __init__(self, max_workers=num_evaluators ) + # pass parameters to profiler + if profiler is not None: + self._profiler.record_parameters(llm, evaluation, self) # ZL: necessary + + def _adjust_pop_size(self): + # adjust population size + if self._max_sample_nums >= 10000: + if self._pop_size is None: + self._pop_size = 40 + elif abs(self._pop_size - 40) > 20: + print(f'Warning: population size {self._pop_size} ' + f'is not suitable, please reset it to 40.') + elif self._max_sample_nums >= 1000: + if self._pop_size is None: + self._pop_size = 20 + elif abs(self._pop_size - 20) > 10: + print(f'Warning: population size {self._pop_size} ' + f'is not suitable, please reset it to 20.') + elif self._max_sample_nums >= 200: + if self._pop_size is None: + self._pop_size = 10 + elif abs(self._pop_size - 10) > 5: + print(f'Warning: population size {self._pop_size} ' + f'is not suitable, please reset it to 10.') + else: + if self._pop_size is None: + self._pop_size = 5 + elif abs(self._pop_size - 5) > 5: + print(f'Warning: population size {self._pop_size} ' + f'is not suitable, please reset it to 5.') + def _sample_evaluate_register(self, prompt): - """Sample a function using the given prompt -> evaluate it by submitting to the process/thread pool -> - add the function to the population and register it to the profiler. + """Perform following steps: + 1. Sample an algorithm using the given prompt. + 2. Evaluate it by submitting to the process/thread pool, and get the results. + 3. Add the function to the population and register it to the profiler. """ sample_start = time.time() thought, func = self._sampler.get_thought_and_function(prompt) sample_time = time.time() - sample_start if thought is None or func is None: return - # convert to Program instance program = TextFunctionProgramConverter.function_to_program(func, self._template_program) if program is None: return - # evaluate score, eval_time = self._evaluation_executor.submit( self._evaluator.evaluate_program_record_time, program ).result() - - # score + # register to profiler func.score = score func.evaluate_time = eval_time func.algorithm = thought func.sample_time = sample_time - try: - if self._profiler is not None: - self._profiler.register_function(func, program=str(program)) - if isinstance(self._profiler, MEoHProfiler): - self._profiler.register_population(self._population) - self._tot_sample_nums += 1 - except Exception as e: - traceback.print_exc() + if self._profiler is not None: + self._profiler.register_function(func) + if isinstance(self._profiler, MEoHProfiler): + self._profiler.register_population(self._population) + self._tot_sample_nums += 1 # register to the population self._population.register_function(func) - def _continue_sample(self): - """Check if it meets the max_sample_nums restrictions. - """ + def _continue_loop(self) -> bool: if self._max_generations is None and self._max_sample_nums is None: return True - if self._max_generations is None and self._max_sample_nums is not None: - if self._tot_sample_nums < self._max_sample_nums: - return True - else: - return False - if self._max_generations is not None and self._max_sample_nums is None: - if self._population.generation < self._max_generations: - return True - else: - return False - if self._max_generations is not None and self._max_sample_nums is not None: - continue_until_reach_gen = False - continue_until_reach_sample = False - if self._population.generation < self._max_generations: - continue_until_reach_gen = True - if self._tot_sample_nums < self._max_sample_nums: - continue_until_reach_sample = True - return continue_until_reach_gen and continue_until_reach_sample + elif self._max_generations is not None and self._max_sample_nums is None: + return self._population.generation < self._max_generations + elif self._max_generations is None and self._max_sample_nums is not None: + return self._tot_sample_nums < self._max_sample_nums + else: + return (self._population.generation < self._max_generations + and self._tot_sample_nums < self._max_sample_nums) def _thread_do_evolutionary_operator(self): - while self._continue_sample(): + while self._continue_loop(): try: # get a new func using e1 indivs = [self._population.selection() for _ in range(self._selection_num)] prompt = MEoHPrompt.get_prompt_e1(self._task_description_str, indivs, self._function_to_evolve) - if self._debug_mode: - print(prompt) - input() - + print(f'E1 Prompt: {prompt}') self._sample_evaluate_register(prompt) - if not self._continue_sample(): + if not self._continue_loop(): break # get a new func using e2 if self._use_e2_operator: indivs = [self._population.selection() for _ in range(self._selection_num)] prompt = MEoHPrompt.get_prompt_e2(self._task_description_str, indivs, self._function_to_evolve) - if self._debug_mode: - print(prompt) - input() - + print(f'E2 Prompt: {prompt}') self._sample_evaluate_register(prompt) - if not self._continue_sample(): + if not self._continue_loop(): break # get a new func using m1 if self._use_m1_operator: indiv = self._population.selection() prompt = MEoHPrompt.get_prompt_m1(self._task_description_str, indiv, self._function_to_evolve) - if self._debug_mode: - print(prompt) - input() - + print(f'M1 Prompt: {prompt}') self._sample_evaluate_register(prompt) - if not self._continue_sample(): + if not self._continue_loop(): break # get a new func using m2 if self._use_m2_operator: indiv = self._population.selection() prompt = MEoHPrompt.get_prompt_m2(self._task_description_str, indiv, self._function_to_evolve) - if self._debug_mode: - print(prompt) - input() - + print(f'M2 Prompt: {prompt}') self._sample_evaluate_register(prompt) - if not self._continue_sample(): + if not self._continue_loop(): break except KeyboardInterrupt: break @@ -258,40 +272,29 @@ def _thread_do_evolutionary_operator(self): except: pass - def _thread_init_population(self): + def _iteratively_init_population(self): """Let a thread repeat {sample -> evaluate -> register to population} to initialize a population. """ while self._population.generation == 0: - if not self._continue_sample(): - break try: # get a new func using i1 prompt = MEoHPrompt.get_prompt_i1(self._task_description_str, self._function_to_evolve) self._sample_evaluate_register(prompt) - except Exception as e: + if self._tot_sample_nums > self._initial_sample_nums_max: + print(f'Warning: Initialization not accomplished in {self._initial_sample_nums_max} samples !!!') + break + except Exception: if self._debug_mode: traceback.print_exc() exit() continue - def _init_population(self): + def _multi_threaded_sampling(self, fn: callable, *args, **kwargs): # threads for sampling sampler_threads = [ - Thread( - target=self._thread_init_population, - ) for _ in range(self._num_samplers) - ] - for t in sampler_threads: - t.start() - for t in sampler_threads: - t.join() - - def _do_sample(self): - sampler_threads = [ - Thread( - target=self._thread_do_evolutionary_operator, - ) for _ in range(self._num_samplers) + Thread(target=fn, args=args, kwargs=kwargs) + for _ in range(self._num_samplers) ] for t in sampler_threads: t.start() @@ -300,17 +303,19 @@ def _do_sample(self): def run(self): if not self._resume_mode: - # do init - self._population = Population(pop_size=self._pop_size) - self._init_population() - while len([f for f in self._population if not np.isinf(np.array(f.score)).any()]) < self._selection_num: - self._population._generation -= 1 - self._init_population() + # do initialization + self._multi_threaded_sampling(self._iteratively_init_population) + # while len([f for f in self._population if not np.isinf(np.array(f.score)).any()]) < self._selection_num: + # self._population._generation -= 1 + # self._init_population() + if len(self._population) < self._selection_num: + print( + f'The search is terminated since MEoH unable to obtain {self._selection_num} feasible algorithms during initialization. ' + f'Please increase the `initial_sample_nums_max` argument (currently {self._initial_sample_nums_max}). ' + f'Please also check your evaluation implementation and LLM implementation.') + return # do evolve - self._do_sample() - + self._multi_threaded_sampling(self._thread_do_evolutionary_operator) # finish if self._profiler is not None: self._profiler.finish() - - self._sampler.llm.close() diff --git a/llm4ad/method/meoh/population.py b/llm4ad/method/meoh/population.py index d5de204f..1fc32588 100644 --- a/llm4ad/method/meoh/population.py +++ b/llm4ad/method/meoh/population.py @@ -47,40 +47,50 @@ def elitist(self): def generation(self): return self._generation + def survival(self): + pop = self._population + self._next_gen_pop + + # elitist + pop_elitist = pop + self._elitist + objs = [ind.score for ind in pop_elitist] + objs_array = -np.array(objs) + nondom_idx = NonDominatedSorting().do(objs_array, only_non_dominated_front=True) + self._elitist = [] + for idx in nondom_idx.tolist(): + self._elitist.append(pop_elitist[idx]) + + crt_pop_size = len(pop) + dominated_counts = np.zeros((crt_pop_size, crt_pop_size)) + for i in range(crt_pop_size): + for j in range(i + 1, crt_pop_size): + if (np.array(pop[i].score) >= np.array(pop[j].score)).all(): + dominated_counts[i, j] = -calc_syntax_match([str(pop[i])], str(pop[j]), 'python') + # dominated_counts[i, j] = -calc_syntax_match([pop[i].entire_code], pop[j].entire_code, 'python') + elif (np.array(pop[j].score) >= np.array(pop[i].score)).all(): + dominated_counts[j, i] = -calc_syntax_match([str(pop[j])], str(pop[i]), 'python') + # dominated_counts[j, i] = -calc_syntax_match([pop[j].entire_code], pop[i].entire_code, 'python') + dominated_counts_ = dominated_counts.sum(0) + self._population = [pop[i] for i in np.argsort(-dominated_counts_)[:self._pop_size // 4]] # minus for descending, //4 for keep the original pop_size + self._next_gen_pop = [] + self._generation += 1 + def register_function(self, func: Function): - # we only accept valid functions - if func.score is None: + # in population initialization, we only accept valid functions + if self._generation == 0 and func.score is None: return + # if the score is None, we still put it into the population, + # we set the score to '-inf' + if func.score is None: + func.score = np.array([float('-inf'), float('-inf')]) try: self._lock.acquire() + if self.has_duplicate_function(func): + func.score = np.array([float('-inf'), float('-inf')]) # register to next_gen - if not self.has_duplicate_function(func): - self._next_gen_pop.append(func) - + self._next_gen_pop.append(func) # update: perform survival if reach the pop size - if len(self._next_gen_pop) >= self._pop_size or (len(self._next_gen_pop) >= self._pop_size // 4 and self._generation == 0): - pop = self._population + self._next_gen_pop - - pop_elitist = pop + self._elitist - objs = [ind.score for ind in pop_elitist] - objs_array = -np.array(objs) - nondom_idx = NonDominatedSorting().do(objs_array, only_non_dominated_front=True) - self._elitist = [] - for idx in nondom_idx.tolist(): - self._elitist.append(pop_elitist[idx]) - - crt_pop_size = len(pop) - dominated_counts = np.zeros((crt_pop_size, crt_pop_size)) - for i in range(crt_pop_size): - for j in range(i + 1, crt_pop_size): - if (np.array(pop[i].score) >= np.array(pop[j].score)).all(): - dominated_counts[i, j] = -calc_syntax_match([pop[i].entire_code], pop[j].entire_code, 'python') - elif (np.array(pop[j].score) >= np.array(pop[i].score)).all(): - dominated_counts[j, i] = -calc_syntax_match([pop[j].entire_code], pop[i].entire_code, 'python') - dominated_counts_ = dominated_counts.sum(0) - self._population = [pop[i] for i in np.argsort(-dominated_counts_)[:self._pop_size // 5]] # minus for descending, //5 for keep the original pop_size - self._next_gen_pop = [] - self._generation += 1 + if len(self._next_gen_pop) >= self._pop_size: + self.survival() except Exception as e: traceback.print_exc() return @@ -88,28 +98,12 @@ def register_function(self, func: Function): self._lock.release() def has_duplicate_function(self, func: str | Function) -> bool: - if func.score is None: - return True - - for i in range(len(self._population)): - f = self._population[i] - if str(f) == str(func): - if func.score[0] > f.score[0]: - self._population[i] = func - return True - if func.score[0] == f.score[0] and func.score[1] > f.score[1]: - self._population[i] = func - return True - - for i in range(len(self._next_gen_pop)): - f = self._next_gen_pop[i] - if str(f) == str(func): - if func.score[0] > f.score[0]: - self._next_gen_pop[i] = func - return True - if func.score[0] == f.score[0] and func.score[1] > f.score[1]: - self._next_gen_pop[i] = func - return True + for f in self._population: + if str(f) == str(func) or (func.score[0] == f.score[0] and func.score[1] <= f.score[1]): + return True + for f in self._next_gen_pop: + if str(f) == str(func) or (func.score[0] == f.score[0] and func.score[1] <= f.score[1]): + return True return False def selection(self) -> Function: @@ -123,9 +117,9 @@ def selection(self) -> Function: for i in range(crt_pop_size): for j in range(i + 1, crt_pop_size): if (np.array(funcs[i].score) >= np.array(funcs[j].score)).all(): - dominated_counts[i, j] = -calc_syntax_match([funcs[i].entire_code], funcs[j].entire_code, 'python') + dominated_counts[i, j] = -calc_syntax_match([str(funcs[i])], str(funcs[j]), 'python') elif (np.array(funcs[j].score) >= np.array(funcs[i].score)).all(): - dominated_counts[j, i] = -calc_syntax_match([funcs[j].entire_code], funcs[i].entire_code, 'python') + dominated_counts[j, i] = -calc_syntax_match([str(funcs[j])], str(funcs[i]), 'python') dominated_counts_ = dominated_counts.sum(0) p = np.exp(dominated_counts_) / np.exp(dominated_counts_).sum() diff --git a/llm4ad/method/meoh/profiler.py b/llm4ad/method/meoh/profiler.py index 45f7e338..2ab794fa 100644 --- a/llm4ad/method/meoh/profiler.py +++ b/llm4ad/method/meoh/profiler.py @@ -19,20 +19,26 @@ class MEoHProfiler(ProfilerBase): + _cur_gen = 0 def __init__(self, log_dir: str | None = None, + evaluation_name='Problem', + method_name='MEoH', num_objs=2, *, initial_num_samples=0, log_style='complex', + create_random_path=True, **kwargs): - super().__init__(log_dir=log_dir, + super().__init__(evaluation_name=evaluation_name, + method_name=method_name, + log_dir=log_dir, initial_num_samples=initial_num_samples, log_style=log_style, + create_random_path=create_random_path, num_objs=num_objs, **kwargs) - self._cur_gen = 0 self._pop_lock = Lock() if self._log_dir: self._ckpt_dir = os.path.join(self._log_dir, 'population') @@ -43,8 +49,8 @@ def __init__(self, def register_population(self, pop: Population): try: self._pop_lock.acquire() - if (self._num_samples == 0 or - pop.generation == self._cur_gen): + if (self.__class__._num_samples == 0 or + pop.generation == self.__class__._cur_gen): return funcs = pop.population # type: List[Function] funcs_json = [] # type: List[Dict] @@ -83,12 +89,12 @@ def register_population(self, pop: Population): path = os.path.join(self._elitist_dir, f'elitist_{pop.generation}.json') with open(path, 'w') as json_file: json.dump(funcs_json, json_file, indent=4) - self._cur_gen += 1 + self.__class__._cur_gen += 1 finally: if self._pop_lock.locked(): self._pop_lock.release() - def _write_json(self, function: Function, program='', *, record_type='history', record_sep=200): + def _write_json(self, function: Function, *, record_type='history', record_sep=200): """ Write function data to a JSON file. @@ -102,7 +108,7 @@ def _write_json(self, function: Function, program='', *, record_type='history', if not self._log_dir: return - sample_order = self._num_samples + sample_order = getattr(self.__class__, '_num_samples', 0) func_score = function.score if function.score is not None: if np.isinf(np.array(function.score)).any(): @@ -116,8 +122,7 @@ def _write_json(self, function: Function, program='', *, record_type='history', 'sample_order': sample_order, 'algorithm': function.algorithm, # Added when recording 'function': str(function), - 'score': func_score, - 'program': program, + 'score': func_score } if record_type == 'history': @@ -144,13 +149,30 @@ class MEoHTensorboardProfiler(TensorboardProfiler, MEoHProfiler): def __init__(self, log_dir: str | None = None, + evaluation_name='Problem', + method_name='MEoH', *, initial_num_samples=0, log_style='complex', + create_random_path=True, **kwargs): - MEoHProfiler.__init__(self, log_dir=log_dir, **kwargs) - TensorboardProfiler.__init__(self, log_dir=log_dir, initial_num_samples=initial_num_samples, - log_style=log_style, **kwargs) + MEoHProfiler.__init__( + self, log_dir=log_dir, + evaluation_name=evaluation_name, + create_random_path=create_random_path, + method_name=method_name, + **kwargs + ) + TensorboardProfiler.__init__( + self, + log_dir=log_dir, + evaluation_name=evaluation_name, + method_name=method_name, + initial_num_samples=initial_num_samples, + log_style=log_style, + create_random_path=create_random_path, + **kwargs + ) def finish(self): if self._log_dir: @@ -164,20 +186,37 @@ def finish(self): class MEoHWandbProfiler(WandBProfiler, MEoHProfiler): + _cur_gen = 0 def __init__(self, wandb_project_name: str, log_dir: str | None = None, + evaluation_name="Problem", + method_name="MEoH", *, initial_num_samples=0, log_style='complex', + create_random_path=True, **kwargs): - MEoHProfiler.__init__(self, log_dir=log_dir, **kwargs) - WandBProfiler.__init__(self, - wandb_project_name=wandb_project_name, - log_dir=log_dir, - initial_num_samples=initial_num_samples, - log_style=log_style, **kwargs) + MEoHProfiler.__init__( + self, + log_dir=log_dir, + evaluation_name=evaluation_name, + create_random_path=create_random_path, + method_name=method_name, + **kwargs + ) + WandBProfiler.__init__( + self, + wandb_project_name=wandb_project_name, + log_dir=log_dir, + evaluation_name=evaluation_name, + method_name=method_name, + initial_num_samples=initial_num_samples, + log_style=log_style, + create_random_path=create_random_path, + **kwargs + ) self._pop_lock = Lock() if self._log_dir: self._ckpt_dir = os.path.join(self._log_dir, 'population')