geneticalgorithm2.geneticalgorithm2
1from typing import Callable, List, Tuple, Optional, Dict, Any, Union, Sequence, Literal, Iterable 2from typing_extensions import TypeAlias 3 4import collections 5import warnings 6import operator 7 8import sys 9import time 10import random 11import math 12 13import numpy as np 14 15from OppOpPopInit.initialiser import CreatorFunc 16from OppOpPopInit.oppositor import OppositorFunc 17 18#region INTERNAL IMPORTS 19 20from .utils.aliases import array1D, array2D 21 22from .data_types.aliases import FunctionToMinimize, SetFunctionToMinimize 23from .data_types.algorithm_params import AlgorithmParams 24from .data_types.generation import GenerationConvertible, Generation 25from .data_types.result import GAResult 26 27from .population_initializer import get_population_initializer, PopulationModifier 28from .utils.plotting import plot_pop_scores, plot_several_lines 29 30from .utils.funcs import can_be_prob, is_numpy, is_current_gen_number, fast_min, random_indexes_pair 31 32from .callbacks.data import MiddleCallbackData 33from .callbacks import MiddleCallbackFunc, SimpleCallbackFunc 34 35#endregion 36 37#region ALIASES 38 39VARIABLE_TYPE: TypeAlias = Literal['int', 'real', 'bool'] 40""" 41the variable type for a given or all dimension, determines the values discretion: 42 43 real: double numbers 44 45 int: integer number only 46 47 bool: in the fact is integer with bounds [0, 1] 48""" 49 50#endregion 51 52 53class GeneticAlgorithm2: 54 """ 55 Genetic algorithm optimization process 56 """ 57 58 default_params = AlgorithmParams() 59 PROGRESS_BAR_LEN = 20 60 """max count of symbols in the progress bar""" 61 62 @property 63 def output_dict(self): 64 warnings.warn( 65 "'output_dict' is deprecated and will be removed at version 7 \n" 66 "use 'result' instead" 67 ) 68 return self.result 69 70 @property 71 def needs_mutation(self) -> bool: 72 """whether the mutation is required""" 73 return self.prob_mut > 0 or self.prob_mut_discrete > 0 74 75 #region INIT 76 77 def __init__( 78 self, 79 function: FunctionToMinimize = None, 80 81 dimension: int = 0, 82 variable_type: Union[VARIABLE_TYPE, Sequence[VARIABLE_TYPE]] = 'bool', 83 variable_boundaries: Optional[Union[array2D, Sequence[Tuple[float, float]]]] = None, 84 85 variable_type_mixed=None, 86 87 function_timeout: Optional[float] = None, 88 algorithm_parameters: Union[AlgorithmParams, Dict[str, Any]] = default_params 89 ): 90 """ 91 initializes the GA object and performs main checks 92 93 Args: 94 function: the given objective function to be minimized -- deprecated and moved to run() method 95 dimension: the number of decision variables, the population samples dimension 96 97 variable_type: string means the variable type for all variables, 98 for mixed types use sequence of strings of type for each variable 99 100 variable_boundaries: leave it None if variable_type is 'bool'; 101 otherwise provide a sequence of tuples of length two as boundaries for each variable; 102 the length of the array must be equal dimension. 103 For example, ([0,100], [0,200]) determines 104 lower boundary 0 and upper boundary 100 for first 105 and upper boundary 200 for second variable 106 and dimension must be 2. 107 108 variable_type_mixed -- deprecated 109 110 function_timeout: if the given function does not provide 111 output before function_timeout (unit is seconds) the algorithm raises error. 112 For example, when there is an infinite loop in the given function. 113 `None` means disabling -- deprecated and moved to run() 114 115 algorithm_parameters: AlgorithmParams object or usual dictionary with algorithm parameter; 116 it is not mandatory to provide all possible parameters 117 118 Notes: 119 - This implementation minimizes the given objective function. 120 For maximization u can multiply the function by -1 (for instance): the absolute 121 value of the output would be the actual objective function 122 123 for more details and examples of implementation please visit: 124 https://github.com/PasaOpasen/geneticalgorithm2 125 126 """ 127 128 # all default fields 129 130 # self.crossover: Callable[[np.ndarray, np.ndarray], Tuple[np.ndarray, np.ndarray]] = None 131 # self.real_mutation: Callable[[float, float, float], float] = None 132 # self.discrete_mutation: Callable[[int, int, int], int] = None 133 # self.selection: Callable[[np.ndarray, int], np.ndarray] = None 134 135 self.result = None 136 self.best_function = None 137 138 self.revolution_oppositor = None 139 self.dup_oppositor = None 140 self.creator = None 141 self.init_oppositors = None 142 143 self.f: Callable[[array1D], float] = None 144 self.funtimeout: float = None 145 146 self.set_function: Callable[[np.ndarray], np.ndarray] = None 147 148 # self.dim: int = None 149 self.var_bounds: List[Tuple[Union[int, float], Union[int, float]]] = None 150 # self.indexes_int: np.ndarray = None 151 # self.indexes_float: np.ndarray = None 152 153 self.checked_reports: List[Tuple[str, Callable[[array1D], None]]] = None 154 155 self.population_size: int = None 156 self.progress_stream = None 157 158 # input algorithm's parameters 159 160 assert isinstance(algorithm_parameters, (dict, AlgorithmParams)), ( 161 "algorithm_parameters must be dict or AlgorithmParams object" 162 ) 163 if not isinstance(algorithm_parameters, AlgorithmParams): 164 algorithm_parameters = AlgorithmParams.from_dict(algorithm_parameters) 165 algorithm_parameters.validate() 166 self.param = algorithm_parameters 167 168 self.crossover, self.real_mutation, self.discrete_mutation, self.selection = algorithm_parameters.get_CMS_funcs() 169 170 # dimension and area bounds 171 self.dim = int(dimension) 172 assert self.dim > 0, f"dimension count must be int and >0, gotten {dimension}" 173 174 if variable_type_mixed is not None: 175 warnings.warn( 176 f"argument variable_type_mixed is deprecated and will be removed at version 7\n " 177 f"use variable_type={tuple(variable_type_mixed)} instead" 178 ) 179 variable_type = variable_type_mixed 180 self._set_types_indexes(variable_type) # types indexes 181 self._set_var_boundaries(variable_type, variable_boundaries) # input variables' boundaries 182 183 # fix mutation probs 184 185 assert can_be_prob(self.param.mutation_probability) 186 self.prob_mut = self.param.mutation_probability 187 assert self.param.mutation_discrete_probability is None or can_be_prob(self.param.mutation_discrete_probability) 188 self.prob_mut_discrete = self.param.mutation_discrete_probability or self.prob_mut 189 190 if self.param.crossover_probability is not None: 191 warnings.warn( 192 f"crossover_probability is deprecated and will be removed in version 7. " 193 f"Reason: it's old and has no sense" 194 ) 195 196 ############################################################# 197 # input function 198 if function: 199 warnings.warn( 200 f"function is deprecated in init constructor and will be removed in version 7. " 201 f"Move this argument to run() method" 202 ) 203 self._check_function(function) 204 if function_timeout: 205 warnings.warn( 206 f"function_timeout is deprecated in init constructor and will be removed in version 7. " 207 f"Move this argument to run() method" 208 ) 209 self._check_function_timeout(function_timeout) 210 211 ############################################################# 212 213 self.population_size = int(self.param.population_size) 214 self._set_parents_count(self.param.parents_portion) 215 self._set_elit_count(self.population_size, self.param.elit_ratio) 216 assert self.parents_count >= self.elit_count, ( 217 f"\n number of parents ({self.parents_count}) " 218 f"must be greater than number of elits ({self.elit_count})" 219 ) 220 221 self._set_max_iterations() 222 223 self._set_report() 224 225 # specify this function to speed up or change default behaviour 226 self.fill_children: Optional[Callable[[array2D, int], None]] = None 227 """ 228 custom function which adds children for population POP 229 where POP[:parents_count] are parents lines and next lines are for children 230 """ 231 232 def _set_types_indexes(self, variable_type: Union[str, Sequence[str]]): 233 234 indexes = np.arange(self.dim) 235 self.indexes_int = np.array([]) 236 self.indexes_float = np.array([]) 237 238 assert_message = ( 239 f"\n variable_type must be 'bool', 'int', 'real' or a sequence with 'int' and 'real', got {variable_type}" 240 ) 241 242 if isinstance(variable_type, str): 243 assert (variable_type in VARIABLE_TYPE.__args__), assert_message 244 if variable_type == 'real': 245 self.indexes_float = indexes 246 else: 247 self.indexes_int = indexes 248 249 else: # sequence case 250 251 assert len(variable_type) == self.dim, ( 252 f"\n variable_type must have a length equal dimension. " 253 f"Should be {self.dim}, got {len(variable_type)}" 254 ) 255 assert 'bool' not in variable_type, ( 256 "don't use 'bool' if variable_type is a sequence, " 257 "for 'boolean' case use 'int' and specify boundary as (0,1)" 258 ) 259 assert all(v in VARIABLE_TYPE.__args__ for v in variable_type), assert_message 260 261 vartypes = np.array(variable_type) 262 self.indexes_int = indexes[vartypes == 'int'] 263 self.indexes_float = indexes[vartypes == 'real'] 264 265 def _set_var_boundaries( 266 self, 267 variable_type: Union[str, Sequence[str]], 268 variable_boundaries 269 ): 270 if isinstance(variable_type, str) and variable_type == 'bool': 271 self.var_bounds = [(0, 1)] * self.dim 272 else: 273 274 if is_numpy(variable_boundaries): 275 assert variable_boundaries.shape == (self.dim, 2), ( 276 f"\n if variable_boundaries is numpy array, it must be with shape (dim, 2)" 277 ) 278 else: 279 assert len(variable_boundaries) == self.dim and all((len(t) == 2 for t in variable_boundaries)), ( 280 "\n if variable_boundaries is sequence, " 281 "it must be with len dim and boundary for each variable must be a tuple of length two" 282 ) 283 284 for i in variable_boundaries: 285 assert i[0] <= i[1], "\n lower_boundaries must be smaller than upper_boundaries [lower,upper]" 286 287 self.var_bounds = [(i[0], i[1]) for i in variable_boundaries] 288 289 def _set_parents_count(self, parents_portion: float): 290 291 self.parents_count = int(parents_portion * self.population_size) 292 assert self.population_size >= self.parents_count > 1, ( 293 f'parents count {self.parents_count} cannot be less than population size {self.population_size}' 294 ) 295 trl = self.population_size - self.parents_count 296 if trl % 2 != 0: 297 self.parents_count += 1 298 299 def _set_elit_count(self, pop_size: int, elit_ratio: Union[float, int]): 300 301 assert elit_ratio >= 0 302 self.elit_count = elit_ratio if isinstance(elit_ratio, str) else math.ceil(pop_size*elit_ratio) 303 304 def _set_max_iterations(self): 305 306 if self.param.max_num_iteration is None: 307 iterate = 0 308 for i in range(0, self.dim): 309 bound_min, bound_max = self.var_bounds[i] 310 var_space = bound_max - bound_min 311 if i in self.indexes_int: 312 iterate += var_space * self.dim * (100 / self.population_size) 313 else: 314 iterate += var_space * 50 * (100 / self.population_size) 315 iterate = int(iterate) 316 if (iterate * self.population_size) > 10000000: 317 iterate = 10000000 / self.population_size 318 319 self.max_iterations = fast_min(iterate, 8000) 320 else: 321 assert self.param.max_num_iteration > 0 322 self.max_iterations = math.ceil(self.param.max_num_iteration) 323 324 max_it = self.param.max_iteration_without_improv 325 if max_it is None: 326 self.max_stagnations = self.max_iterations + 1 327 else: 328 self.max_stagnations = math.ceil(max_it) 329 330 def _check_function(self, function: Callable[[array1D], float]): 331 assert callable(function), "function must be callable!" 332 self.f = function 333 334 def _check_function_timeout(self, function_timeout: Optional[float]): 335 336 if function_timeout is not None and function_timeout > 0: 337 try: 338 from func_timeout import func_timeout, FunctionTimedOut 339 except ModuleNotFoundError: 340 raise ModuleNotFoundError( 341 "function_timeout > 0 needs additional package func_timeout\n" 342 "run `python -m pip install func_timeout`\n" 343 "or disable this parameter: function_timeout=None" 344 ) 345 346 self.funtimeout = None if function_timeout is None else float(function_timeout) 347 348 #endregion 349 350 #region REPORT 351 352 def _set_report(self): 353 """ 354 creates default report checker 355 """ 356 self.checked_reports = [ 357 # item 0 cuz scores will be sorted and min item is items[0] 358 ('report', operator.itemgetter(0)) 359 ] 360 361 def _clear_report(self): 362 """ 363 removes all report objects 364 """ 365 fields = [f for f in vars(self).keys() if f.startswith('report')] 366 for attr in fields: 367 delattr(self, attr) 368 369 def _init_report(self): 370 """ 371 makes empty report fields 372 """ 373 for name, _ in self.checked_reports: 374 setattr(self, name, []) 375 376 def _update_report(self, scores: array1D): 377 """ 378 append report value to the end of field 379 """ 380 for name, func in self.checked_reports: 381 getattr(self, name).append( 382 func(scores) 383 ) 384 385 #endregion 386 387 #region RUN METHODS 388 389 def _progress(self, count: int, total: int, status: str = ''): 390 391 part = count / total 392 393 filled_len = round(GeneticAlgorithm2.PROGRESS_BAR_LEN * part) 394 percents = round(100.0 * part, 1) 395 bar = '|' * filled_len + '_' * (GeneticAlgorithm2.PROGRESS_BAR_LEN - filled_len) 396 397 self.progress_stream.write('\r%s %s%s %s' % (bar, percents, '%', status)) 398 self.progress_stream.flush() 399 400 def __str__(self): 401 return f"Genetic algorithm object with parameters {self.param}" 402 403 def __repr__(self): 404 return self.__str__() 405 406 def _simulate(self, sample: array1D): 407 408 from func_timeout import func_timeout, FunctionTimedOut 409 410 obj = None 411 eval_time = time.time() 412 try: 413 obj = func_timeout( 414 self.funtimeout, 415 lambda: self.f(sample) 416 ) 417 except FunctionTimedOut: 418 print("given function is not applicable") 419 eval_time = time.time() - eval_time 420 421 assert obj is not None, ( 422 f"the given function was running like {eval_time} seconds and does not provide any output" 423 ) 424 425 tp = type(obj) 426 assert ( 427 tp in (int, float) or np.issubdtype(tp, np.floating) or np.issubdtype(tp, np.integer) 428 ), f"Minimized function should return a number, but got '{obj}' object with type {tp}" 429 430 return obj, eval_time 431 432 def _set_mutation_indexes(self, mutation_indexes: Optional[Iterable[int]]): 433 434 if mutation_indexes is None: 435 self.indexes_float_mut = self.indexes_float 436 self.indexes_int_mut = self.indexes_int 437 else: 438 tmp_indexes = set(mutation_indexes) 439 self.indexes_int_mut = np.array(list(tmp_indexes.intersection(self.indexes_int))) 440 self.indexes_float_mut = np.array(list(tmp_indexes.intersection(self.indexes_float))) 441 442 if self.indexes_float_mut.size == 0 and self.indexes_int_mut.size == 0: 443 warnings.warn(f"No mutation dimensions!!! Check ur mutation indexes!!") 444 445 #@profile 446 def run( 447 self, 448 no_plot: bool = False, 449 disable_printing: bool = False, 450 progress_bar_stream: Optional[str] = 'stdout', 451 452 # deprecated 453 disable_progress_bar: bool = False, 454 455 function: FunctionToMinimize = None, 456 function_timeout: Optional[float] = None, 457 458 set_function: SetFunctionToMinimize = None, 459 apply_function_to_parents: bool = False, 460 start_generation: GenerationConvertible = Generation(), 461 studEA: bool = False, 462 mutation_indexes: Optional[Iterable[int]] = None, 463 464 init_creator: Optional[CreatorFunc] = None, 465 init_oppositors: Optional[Sequence[OppositorFunc]] = None, 466 467 duplicates_oppositor: Optional[OppositorFunc] = None, 468 remove_duplicates_generation_step: Optional[int] = None, 469 470 revolution_oppositor: Optional[OppositorFunc] = None, 471 revolution_after_stagnation_step: Optional[int] = None, 472 revolution_part: float = 0.3, 473 474 population_initializer: Tuple[ 475 int, PopulationModifier 476 ] = get_population_initializer(select_best_of=1, local_optimization_step='never', local_optimizer=None), 477 478 stop_when_reached: Optional[float] = None, 479 callbacks: Optional[Sequence[SimpleCallbackFunc]] = None, 480 middle_callbacks: Optional[Sequence[MiddleCallbackFunc]] = None, #+ 481 time_limit_secs: Optional[float] = None, 482 save_last_generation_as: Optional[str] = None, 483 seed: Optional[int] = None 484 ) -> GAResult: 485 """ 486 runs optimization process 487 488 Args: 489 no_plot: do not plot results using matplotlib by default 490 491 disable_printing: do not print log info of optimization process (except progress bar) 492 493 progress_bar_stream: 'stdout', 'stderr' or None to disable progress bar; 494 disabling progress bar can speed up the optimization process in many cases 495 496 disable_progress_bar: deprecated 497 498 function: the given objective function (sample -> its score) to be minimized; 499 500 function_timeout: if the given function does not provide 501 output before function_timeout (unit is seconds) the algorithm raises error. 502 For example, when there is an infinite loop in the given function. 503 `None` means disabling 504 505 set_function: set function (all samples -> score per sample) to be used instead of usual function 506 (usually for optimization purposes) 507 508 apply_function_to_parents: whether to apply function to parents from previous generation (if it's needed), 509 it can be needed at working with games agents, but for other tasks will just waste time 510 511 start_generation: initial generation object of any `GenerationConvertible` type 512 513 studEA: using stud EA strategy (crossover with best object always) 514 515 mutation_indexes: indexes of dimensions where mutation can be performed (all dimensions by default) 516 517 init_creator: the function creates population samples. 518 By default -- random uniform for real variables and random uniform for int 519 init_oppositors: the list of oppositors creates oppositions for base population. No by default 520 521 duplicates_oppositor: oppositor for applying after duplicates removing. 522 By default -- using just random initializer from creator 523 remove_duplicates_generation_step: step for removing duplicates (have a sense with discrete tasks). 524 No by default 525 526 revolution_oppositor: oppositor for revolution time. No by default 527 revolution_after_stagnation_step: create revolution after this generations of stagnation. No by default 528 revolution_part: float, the part of generation to being oppose. By default is 0.3 529 530 population_initializer: object for actions at population initialization step 531 to create better start population. See doc 532 533 stop_when_reached: stop searching after reaching this value 534 (it can be potential minimum or something else) 535 536 callbacks: sequence of callback functions with structure: 537 (generation_number, report_list, last_population, last_scores) -> do some action 538 539 middle_callbacks: sequence of functions made by `MiddleCallback` class 540 541 time_limit_secs: limit time of working (in seconds); 542 `None` means no time limit (limit will be only for count of generation and so on) 543 544 save_last_generation_as: path to .npz file 545 for saving last_generation as numpy dictionary like 546 {'population': 2D-array, 'scores': 1D-array}; 547 None disables this option 548 549 seed: random seed (None if doesn't matter) 550 551 Returns: 552 `GAResult` object; 553 also fills the self.report and self.result with many report information 554 555 Notes: 556 - if `function_timeout` is enabled then `function` must be set 557 - it would be more logical to use params like `studEA` as an algorithm parameter, 558 but `run()`-way can be more convenient for real using 559 """ 560 561 if disable_progress_bar: 562 warnings.warn( 563 f"disable_progress_bar is deprecated and will be removed in version 7, " 564 f"use probress_bar_stream=None to disable progress bar" 565 ) 566 progress_bar_stream = None 567 568 enable_printing: bool = not disable_printing 569 570 start_generation = Generation.from_object(self.dim, start_generation) 571 572 assert is_current_gen_number(revolution_after_stagnation_step), "must be None or int and >0" 573 assert is_current_gen_number(remove_duplicates_generation_step), "must be None or int and >0" 574 assert can_be_prob(revolution_part), f"revolution_part must be in [0,1], not {revolution_part}" 575 assert stop_when_reached is None or isinstance(stop_when_reached, (int, float)) 576 assert isinstance(callbacks, collections.abc.Sequence) or callbacks is None, ( 577 "callbacks should be a list of callbacks functions" 578 ) 579 assert isinstance(middle_callbacks, collections.abc.Sequence) or middle_callbacks is None, ( 580 "middle_callbacks should be list of MiddleCallbacks functions" 581 ) 582 assert time_limit_secs is None or time_limit_secs > 0, 'time_limit_secs must be None of number > 0' 583 584 self._set_mutation_indexes(mutation_indexes) 585 from OppOpPopInit import set_seed 586 set_seed(seed) 587 588 # randomstate = np.random.default_rng(random.randint(0, 1000) if seed is None else seed) 589 # self.randomstate = randomstate 590 591 # using bool flag is faster than using empty function with generated string arguments 592 SHOW_PROGRESS = progress_bar_stream is not None 593 if SHOW_PROGRESS: 594 595 show_progress = lambda t, t2, s: self._progress(t, t2, status=s) 596 597 if progress_bar_stream == 'stdout': 598 self.progress_stream = sys.stdout 599 elif progress_bar_stream == 'stderr': 600 self.progress_stream = sys.stderr 601 else: 602 raise Exception( 603 f"wrong value {progress_bar_stream} of progress_bar_stream, must be 'stdout'/'stderr'/None" 604 ) 605 else: 606 show_progress = None 607 608 stop_by_val = ( 609 (lambda best_f: False) 610 if stop_when_reached is None 611 else (lambda best_f: best_f <= stop_when_reached) 612 ) 613 614 t: int = 0 615 count_stagnation: int = 0 616 pop: array2D = None 617 scores: array1D = None 618 619 #region CALLBACKS 620 621 def get_data(): 622 """ 623 returns all important data about model 624 """ 625 return MiddleCallbackData( 626 last_generation=Generation(pop, scores), 627 current_generation=t, 628 report_list=self.report, 629 630 mutation_prob=self.prob_mut, 631 mutation_discrete_prob=self.prob_mut_discrete, 632 mutation=self.real_mutation, 633 mutation_discrete=self.discrete_mutation, 634 crossover=self.crossover, 635 selection=self.selection, 636 637 current_stagnation=count_stagnation, 638 max_stagnation=self.max_stagnations, 639 640 parents_portion=self.param.parents_portion, 641 elit_ratio=self.param.elit_ratio, 642 643 set_function=self.set_function, 644 645 reason_to_stop=reason_to_stop 646 ) 647 648 def set_data(data: MiddleCallbackData): 649 """ 650 sets data to model 651 """ 652 nonlocal pop, scores, count_stagnation, reason_to_stop 653 654 pop, scores = data.last_generation.variables, data.last_generation.scores 655 self.population_size = pop.shape[0] 656 657 self.param.parents_portion = data.parents_portion 658 self._set_parents_count(data.parents_portion) 659 660 self.param.elit_ratio = data.elit_ratio 661 self._set_elit_count(self.population_size, data.elit_ratio) 662 663 self.prob_mut = data.mutation_prob 664 self.prob_mut_discrete = data.mutation_discrete_prob 665 self.real_mutation = data.mutation 666 self.discrete_mutation = data.mutation_discrete 667 self.crossover = data.crossover 668 self.selection = data.selection 669 670 count_stagnation = data.current_stagnation 671 reason_to_stop = data.reason_to_stop 672 self.max_stagnations = data.max_stagnation 673 674 self.set_function = data.set_function 675 676 if not callbacks: 677 total_callback = lambda g, r, lp, ls: None 678 else: 679 def total_callback( 680 generation_number: int, 681 report_list: List[float], 682 last_population: array2D, 683 last_scores: array1D 684 ): 685 for cb in callbacks: 686 cb(generation_number, report_list, last_population, last_scores) 687 688 if not middle_callbacks: 689 total_middle_callback = lambda: None 690 else: 691 def total_middle_callback(): 692 """ 693 applies callbacks and sets new data if there is a sense 694 """ 695 data = get_data() 696 flag = False 697 for cb in middle_callbacks: 698 data, has_sense = cb(data) 699 if has_sense: 700 flag = True 701 if flag: 702 set_data(data) # update global date if there was real callback step 703 704 #endregion 705 706 start_time = time.time() 707 time_is_done = ( 708 (lambda: False) 709 if time_limit_secs is None 710 else (lambda: int(time.time() - start_time) >= time_limit_secs) 711 ) 712 713 # combine with deprecated parts 714 function = function or self.f 715 function_timeout = function_timeout or self.funtimeout 716 717 assert function or set_function, 'no function to minimize' 718 if function: 719 self._check_function(function) 720 if function_timeout: 721 self._check_function_timeout(function_timeout) 722 723 self.set_function = set_function or GeneticAlgorithm2.default_set_function(self.f) 724 725 #region Initial population, duplicates filter, revolutionary 726 727 pop_coef, initializer_func = population_initializer 728 729 # population creator by random or with oppositions 730 if init_creator is None: 731 732 from OppOpPopInit import SampleInitializers 733 734 # just uniform random 735 self.creator = SampleInitializers.Combined( 736 minimums=[v[0] for v in self.var_bounds], 737 maximums=[v[1] for v in self.var_bounds], 738 indexes=( 739 self.indexes_int, 740 self.indexes_float 741 ), 742 creator_initializers=( 743 SampleInitializers.RandomInteger, 744 SampleInitializers.Uniform 745 ) 746 ) 747 else: 748 assert callable(init_creator) 749 self.creator = init_creator 750 751 self.init_oppositors = init_oppositors 752 self.dup_oppositor = duplicates_oppositor 753 self.revolution_oppositor = revolution_oppositor 754 755 # event for removing duplicates 756 if remove_duplicates_generation_step is None: 757 def remover(pop: array2D, scores: array1D, gen: int) -> Tuple[ 758 array2D, 759 array1D 760 ]: 761 return pop, scores 762 else: 763 764 def without_dup(pop: array2D, scores: array1D): # returns population without dups 765 _, index_of_dups = np.unique(pop, axis=0, return_index=True) 766 return pop[index_of_dups], scores[index_of_dups], scores.size - index_of_dups.size 767 768 if self.dup_oppositor is None: # if there is no dup_oppositor, use random creator 769 def remover(pop: array2D, scores: array1D, gen: int) -> Tuple[ 770 array2D, 771 array1D 772 ]: 773 if gen % remove_duplicates_generation_step != 0: 774 return pop, scores 775 776 pp, sc, count_to_create = without_dup(pop, scores) # pop without dups 777 778 if count_to_create == 0: 779 if SHOW_PROGRESS: 780 show_progress(t, self.max_iterations, 781 f"GA is running...{t} gen from {self.max_iterations}. No dups!") 782 return pop, scores 783 784 pp2 = SampleInitializers.CreateSamples(self.creator, count_to_create) # new pop elements 785 pp2_scores = self.set_function(pp2) # new elements values 786 787 if SHOW_PROGRESS: 788 show_progress(t, self.max_iterations, 789 f"GA is running...{t} gen from {self.max_iterations}. Kill dups!") 790 791 new_pop = np.vstack((pp, pp2)) 792 new_scores = np.concatenate((sc, pp2_scores)) 793 794 args_to_sort = new_scores.argsort() 795 return new_pop[args_to_sort], new_scores[args_to_sort] 796 797 else: # using oppositors 798 assert callable(self.dup_oppositor) 799 800 def remover(pop: np.ndarray, scores: np.ndarray, gen: int) -> Tuple[ 801 np.ndarray, 802 np.ndarray 803 ]: 804 if gen % remove_duplicates_generation_step != 0: 805 return pop, scores 806 807 pp, sc, count_to_create = without_dup(pop, scores) # pop without dups 808 809 if count_to_create == 0: 810 if SHOW_PROGRESS: 811 show_progress(t, self.max_iterations, 812 f"GA is running...{t} gen from {self.max_iterations}. No dups!") 813 return pop, scores 814 815 if count_to_create > sc.size: 816 raise Exception( 817 f"Too many duplicates at generation {gen} ({count_to_create} > {sc.size}), cannot oppose" 818 ) 819 820 # oppose count_to_create worse elements 821 pp2 = OppositionOperators.Reflect(pp[-count_to_create:], self.dup_oppositor) # new pop elements 822 pp2_scores = self.set_function(pp2) # new elements values 823 824 if SHOW_PROGRESS: 825 show_progress(t, self.max_iterations, 826 f"GA is running...{t} gen from {self.max_iterations}. Kill dups!") 827 828 new_pop = np.vstack((pp, pp2)) 829 new_scores = np.concatenate((sc, pp2_scores)) 830 831 args_to_sort = new_scores.argsort() 832 return new_pop[args_to_sort], new_scores[args_to_sort] 833 834 # event for revolution 835 if revolution_after_stagnation_step is None: 836 def revolution(pop: array2D, scores: array1D, stagnation_count: int) -> Tuple[ 837 array2D, 838 array1D 839 ]: 840 return pop, scores 841 else: 842 if revolution_oppositor is None: 843 raise Exception( 844 f"How can I make revolution each {revolution_after_stagnation_step} stagnation steps " 845 f"if revolution_oppositor is None (not defined)?" 846 ) 847 assert callable(revolution_oppositor) 848 849 from OppOpPopInit import OppositionOperators 850 851 def revolution(pop: array2D, scores: array1D, stagnation_count: int) -> Tuple[ 852 array2D, 853 array1D 854 ]: 855 if stagnation_count < revolution_after_stagnation_step: 856 return pop, scores 857 part = int(pop.shape[0]*revolution_part) 858 859 pp2 = OppositionOperators.Reflect(pop[-part:], self.revolution_oppositor) 860 pp2_scores = self.set_function(pp2) 861 862 combined = np.vstack((pop, pp2)) 863 combined_scores = np.concatenate((scores, pp2_scores)) 864 args = combined_scores.argsort() 865 866 if SHOW_PROGRESS: 867 show_progress(t, self.max_iterations, 868 f"GA is running...{t} gen from {self.max_iterations}. Revolution!") 869 870 args = args[:scores.size] 871 return combined[args], combined_scores[args] 872 873 #enregion 874 875 # 876 # 877 # START ALGORITHM LOGIC 878 # 879 # 880 881 # Report 882 self._clear_report() # clear old report objects 883 self._init_report() 884 885 # initialization of pop 886 887 if start_generation.variables is None: 888 889 from OppOpPopInit import init_population 890 891 real_pop_size = self.population_size * pop_coef 892 893 # pop = np.empty((real_pop_size, self.dim)) 894 scores = np.empty(real_pop_size) 895 896 pop = init_population( 897 samples_count=real_pop_size, 898 creator=self.creator, 899 oppositors=self.init_oppositors 900 ) 901 902 if self.funtimeout and self.funtimeout > 0: # perform simulation 903 904 time_counter = 0 905 906 for p in range(0, real_pop_size): 907 # simulation returns exception or func value -- check the time of evaluating 908 value, eval_time = self._simulate(pop[p]) 909 scores[p] = value 910 time_counter += eval_time 911 912 if enable_printing: 913 print( 914 f"\nSim: Average time of function evaluating (secs): " 915 f"{time_counter/real_pop_size} (total = {time_counter})\n" 916 ) 917 else: 918 919 eval_time = time.time() 920 scores = self.set_function(pop) 921 eval_time = time.time() - eval_time 922 if enable_printing: 923 print( 924 f"\nSet: Average time of function evaluating (secs): " 925 f"{eval_time/real_pop_size} (total = {eval_time})\n" 926 ) 927 928 else: 929 930 self.population_size = start_generation.variables.shape[0] 931 self._set_elit_count(self.population_size, self.param.elit_ratio) 932 self._set_parents_count(self.param.parents_portion) 933 934 pop = start_generation.variables 935 936 if start_generation.scores is None: 937 938 _time = time.time() 939 scores = self.set_function(pop) 940 _time = time.time() - _time 941 942 if enable_printing: 943 print( 944 f'\nFirst scores are made from gotten variables ' 945 f'(by {_time} secs, about {_time/scores.size} for each creature)\n' 946 ) 947 else: 948 scores = start_generation.scores 949 if enable_printing: 950 print(f"\nFirst scores are from gotten population\n") 951 952 # Initialization by select bests and local_descent 953 954 pop, scores = initializer_func(pop, scores) 955 956 # first sort 957 args_to_sort = scores.argsort() 958 pop = pop[args_to_sort] 959 scores = scores[args_to_sort] 960 self._update_report(scores) 961 962 self.population_size = scores.size 963 self.best_function = scores[0] 964 965 if enable_printing: 966 print( 967 f"Best score before optimization: {self.best_function}" 968 ) 969 970 t: int = 1 971 count_stagnation: int = 0 972 """iterations without progress""" 973 reason_to_stop: Optional[str] = None 974 975 # gets indexes of 2 parents to crossover 976 if studEA: 977 get_parents_inds = lambda: (0, random.randrange(1, self.parents_count)) 978 else: 979 get_parents_inds = lambda: random_indexes_pair(self.parents_count) 980 981 # while no reason to stop 982 while True: 983 984 if count_stagnation > self.max_stagnations: 985 reason_to_stop = f"limit of fails: {count_stagnation}" 986 elif t == self.max_iterations: 987 reason_to_stop = f'limit of iterations: {t}' 988 elif stop_by_val(self.best_function): 989 reason_to_stop = f"stop value reached: {self.best_function} <= {stop_when_reached}" 990 elif time_is_done(): 991 reason_to_stop = f'time is done: {time.time() - start_time} >= {time_limit_secs}' 992 993 if reason_to_stop is not None: 994 if SHOW_PROGRESS: 995 show_progress(t, self.max_iterations, f"GA is running... STOP! {reason_to_stop}") 996 break 997 998 if scores[0] < self.best_function: # if there is progress 999 count_stagnation = 0 1000 self.best_function = scores[0] 1001 else: 1002 count_stagnation += 1 1003 1004 if SHOW_PROGRESS: 1005 show_progress( 1006 t, 1007 self.max_iterations, 1008 f"GA is running...{t} gen from {self.max_iterations}...best value = {self.best_function}" 1009 ) 1010 1011 # Select parents 1012 1013 par: array2D = np.empty((self.parents_count, self.dim)) 1014 """samples chosen to create new samples""" 1015 par_scores: array1D = np.empty(self.parents_count) 1016 1017 elit_slice = slice(None, self.elit_count) 1018 """elit parents""" 1019 1020 # copy needs because the generation will be removed after parents selection 1021 par[elit_slice] = pop[elit_slice].copy() 1022 par_scores[elit_slice] = scores[elit_slice].copy() 1023 1024 new_par_inds = ( 1025 self.selection( 1026 scores[self.elit_count:], 1027 self.parents_count - self.elit_count 1028 ).astype(np.int16) + self.elit_count 1029 ) 1030 """non-elit parents indexes""" 1031 #new_par_inds = self.selection(scores, self.parents_count - self.elit_count).astype(np.int16) 1032 par_slice = slice(self.elit_count, self.parents_count) 1033 par[par_slice] = pop[new_par_inds].copy() 1034 par_scores[par_slice] = scores[new_par_inds].copy() 1035 1036 pop = np.empty((self.population_size, self.dim)) 1037 """new generation""" 1038 scores = np.empty(self.population_size) 1039 """new generation scores""" 1040 1041 parents_slice = slice(None, self.parents_count) 1042 pop[parents_slice] = par 1043 scores[parents_slice] = par_scores 1044 1045 if self.fill_children is None: # default fill children behaviour 1046 DO_MUTATION = self.needs_mutation 1047 for k in range(self.parents_count, self.population_size, 2): 1048 1049 r1, r2 = get_parents_inds() 1050 1051 pvar1 = pop[r1] # equal to par[r1], but better for cache 1052 pvar2 = pop[r2] 1053 1054 ch1, ch2 = self.crossover(pvar1, pvar2) 1055 1056 if DO_MUTATION: 1057 ch1 = self.mut(ch1) 1058 ch2 = self.mut_middle(ch2, pvar1, pvar2) 1059 1060 pop[k] = ch1 1061 pop[k+1] = ch2 1062 else: # custom behaviour 1063 self.fill_children(pop, self.parents_count) 1064 1065 if apply_function_to_parents: 1066 scores = self.set_function(pop) 1067 else: 1068 scores[self.parents_count:] = self.set_function(pop[self.parents_count:]) 1069 1070 # remove duplicates 1071 pop, scores = remover(pop, scores, t) 1072 # revolution 1073 pop, scores = revolution(pop, scores, count_stagnation) 1074 1075 # make population better 1076 args_to_sort = scores.argsort() 1077 pop = pop[args_to_sort] 1078 scores = scores[args_to_sort] 1079 self._update_report(scores) 1080 1081 # callback it 1082 total_callback(t, self.report, pop, scores) 1083 total_middle_callback() 1084 1085 t += 1 1086 1087 self.best_function = fast_min(scores[0], self.best_function) 1088 1089 last_generation = Generation(pop, scores) 1090 self.result = GAResult(last_generation) 1091 1092 if save_last_generation_as is not None: 1093 last_generation.save(save_last_generation_as) 1094 1095 if enable_printing: 1096 show = ' ' * 200 1097 sys.stdout.write( 1098 f'\r{show}\n' 1099 f'\r The best found solution:\n {pop[0]}' 1100 f'\n\n Objective function:\n {self.best_function}\n' 1101 f'\n Used generations: {len(self.report)}' 1102 f'\n Used time: {time.time() - start_time:.3g} seconds\n' 1103 ) 1104 sys.stdout.flush() 1105 1106 if not no_plot: 1107 self.plot_results() 1108 1109 if enable_printing: 1110 if reason_to_stop is not None and 'iterations' not in reason_to_stop: 1111 sys.stdout.write( 1112 f'\nWarning: GA is terminated because of {reason_to_stop}' 1113 ) 1114 1115 return self.result 1116 1117 #endregion 1118 1119 #region PLOTTING 1120 1121 def plot_results( 1122 self, 1123 title: str = 'Genetic Algorithm', 1124 save_as: Optional[str] = None, 1125 dpi: int = 200, 1126 main_color: str = 'blue' 1127 ): 1128 """ 1129 Simple plot of self.report (if not empty) 1130 """ 1131 if len(self.report) == 0: 1132 sys.stdout.write("No results to plot!\n") 1133 return 1134 1135 plot_several_lines( 1136 lines=[self.report], 1137 colors=[main_color], 1138 labels=['best of generation'], 1139 linewidths=[2], 1140 title=title, 1141 xlabel='Generation', 1142 ylabel='Minimized function', 1143 save_as=save_as, 1144 dpi=dpi 1145 ) 1146 1147 def plot_generation_scores(self, title: str = 'Last generation scores', save_as: Optional[str] = None): 1148 """ 1149 Plots barplot of scores of last population 1150 """ 1151 1152 if not hasattr(self, 'result'): 1153 raise Exception( 1154 "There is no 'result' field into ga object! Before plotting generation u need to run seaching process" 1155 ) 1156 1157 plot_pop_scores(self.result.last_generation.scores, title, save_as) 1158 1159 #endregion 1160 1161 #region MUTATION 1162 def mut(self, x: array1D): 1163 """ 1164 just mutation 1165 """ 1166 1167 for i in self.indexes_int_mut: 1168 if random.random() < self.prob_mut_discrete: 1169 x[i] = self.discrete_mutation(x[i], *self.var_bounds[i]) 1170 1171 for i in self.indexes_float_mut: 1172 if random.random() < self.prob_mut: 1173 x[i] = self.real_mutation(x[i], *self.var_bounds[i]) 1174 1175 return x 1176 1177 def mut_middle(self, x: array1D, p1: array1D, p2: array1D): 1178 """ 1179 mutation oriented on parents 1180 """ 1181 for i in self.indexes_int_mut: 1182 1183 if random.random() < self.prob_mut_discrete: 1184 v1, v2 = p1[i], p2[i] 1185 if v1 < v2: 1186 x[i] = random.randint(v1, v2) 1187 elif v1 > v2: 1188 x[i] = random.randint(v2, v1) 1189 else: 1190 x[i] = random.randint(*self.var_bounds[i]) 1191 1192 for i in self.indexes_float_mut: 1193 if random.random() < self.prob_mut: 1194 v1, v2 = p1[i], p2[i] 1195 if v1 != v2: 1196 x[i] = random.uniform(v1, v2) 1197 else: 1198 x[i] = random.uniform(*self.var_bounds[i]) 1199 return x 1200 1201 #endregion 1202 1203 #region Set functions 1204 1205 @staticmethod 1206 def default_set_function(function_for_set: Callable[[array1D], float]): 1207 """ 1208 simple function for creating set_function 1209 function_for_set just applies to each row of population 1210 """ 1211 def func(matrix: array2D): 1212 return np.array( 1213 [function_for_set(row) for row in matrix] 1214 ) 1215 return func 1216 1217 @staticmethod 1218 def vectorized_set_function(function_for_set: Callable[[array1D], float]): 1219 """ 1220 works like default, but faster for big populations and slower for little 1221 function_for_set just applyes to each row of population 1222 """ 1223 1224 func = np.vectorize(function_for_set, signature='(n)->()') 1225 return func 1226 1227 @staticmethod 1228 def set_function_multiprocess(function_for_set: Callable[[array1D], float], n_jobs: int = -1): 1229 """ 1230 like function_for_set but uses joblib with n_jobs (-1 goes to count of available processors) 1231 """ 1232 try: 1233 from joblib import Parallel, delayed 1234 except ModuleNotFoundError: 1235 raise ModuleNotFoundError( 1236 "this additional feature requires joblib package," 1237 "run `pip install joblib` or `pip install --upgrade geneticalgorithm2[full]`" 1238 "or use another set function" 1239 ) 1240 1241 def func(matrix: array2D): 1242 result = Parallel(n_jobs=n_jobs)(delayed(function_for_set)(matrix[i]) for i in range(matrix.shape[0])) 1243 return np.array(result) 1244 return func 1245 1246 #endregion 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261
the variable type for a given or all dimension, determines the values discretion:
real: double numbers
int: integer number only
bool: in the fact is integer with bounds [0, 1]
55class GeneticAlgorithm2: 56 """ 57 Genetic algorithm optimization process 58 """ 59 60 default_params = AlgorithmParams() 61 PROGRESS_BAR_LEN = 20 62 """max count of symbols in the progress bar""" 63 64 @property 65 def output_dict(self): 66 warnings.warn( 67 "'output_dict' is deprecated and will be removed at version 7 \n" 68 "use 'result' instead" 69 ) 70 return self.result 71 72 @property 73 def needs_mutation(self) -> bool: 74 """whether the mutation is required""" 75 return self.prob_mut > 0 or self.prob_mut_discrete > 0 76 77 #region INIT 78 79 def __init__( 80 self, 81 function: FunctionToMinimize = None, 82 83 dimension: int = 0, 84 variable_type: Union[VARIABLE_TYPE, Sequence[VARIABLE_TYPE]] = 'bool', 85 variable_boundaries: Optional[Union[array2D, Sequence[Tuple[float, float]]]] = None, 86 87 variable_type_mixed=None, 88 89 function_timeout: Optional[float] = None, 90 algorithm_parameters: Union[AlgorithmParams, Dict[str, Any]] = default_params 91 ): 92 """ 93 initializes the GA object and performs main checks 94 95 Args: 96 function: the given objective function to be minimized -- deprecated and moved to run() method 97 dimension: the number of decision variables, the population samples dimension 98 99 variable_type: string means the variable type for all variables, 100 for mixed types use sequence of strings of type for each variable 101 102 variable_boundaries: leave it None if variable_type is 'bool'; 103 otherwise provide a sequence of tuples of length two as boundaries for each variable; 104 the length of the array must be equal dimension. 105 For example, ([0,100], [0,200]) determines 106 lower boundary 0 and upper boundary 100 for first 107 and upper boundary 200 for second variable 108 and dimension must be 2. 109 110 variable_type_mixed -- deprecated 111 112 function_timeout: if the given function does not provide 113 output before function_timeout (unit is seconds) the algorithm raises error. 114 For example, when there is an infinite loop in the given function. 115 `None` means disabling -- deprecated and moved to run() 116 117 algorithm_parameters: AlgorithmParams object or usual dictionary with algorithm parameter; 118 it is not mandatory to provide all possible parameters 119 120 Notes: 121 - This implementation minimizes the given objective function. 122 For maximization u can multiply the function by -1 (for instance): the absolute 123 value of the output would be the actual objective function 124 125 for more details and examples of implementation please visit: 126 https://github.com/PasaOpasen/geneticalgorithm2 127 128 """ 129 130 # all default fields 131 132 # self.crossover: Callable[[np.ndarray, np.ndarray], Tuple[np.ndarray, np.ndarray]] = None 133 # self.real_mutation: Callable[[float, float, float], float] = None 134 # self.discrete_mutation: Callable[[int, int, int], int] = None 135 # self.selection: Callable[[np.ndarray, int], np.ndarray] = None 136 137 self.result = None 138 self.best_function = None 139 140 self.revolution_oppositor = None 141 self.dup_oppositor = None 142 self.creator = None 143 self.init_oppositors = None 144 145 self.f: Callable[[array1D], float] = None 146 self.funtimeout: float = None 147 148 self.set_function: Callable[[np.ndarray], np.ndarray] = None 149 150 # self.dim: int = None 151 self.var_bounds: List[Tuple[Union[int, float], Union[int, float]]] = None 152 # self.indexes_int: np.ndarray = None 153 # self.indexes_float: np.ndarray = None 154 155 self.checked_reports: List[Tuple[str, Callable[[array1D], None]]] = None 156 157 self.population_size: int = None 158 self.progress_stream = None 159 160 # input algorithm's parameters 161 162 assert isinstance(algorithm_parameters, (dict, AlgorithmParams)), ( 163 "algorithm_parameters must be dict or AlgorithmParams object" 164 ) 165 if not isinstance(algorithm_parameters, AlgorithmParams): 166 algorithm_parameters = AlgorithmParams.from_dict(algorithm_parameters) 167 algorithm_parameters.validate() 168 self.param = algorithm_parameters 169 170 self.crossover, self.real_mutation, self.discrete_mutation, self.selection = algorithm_parameters.get_CMS_funcs() 171 172 # dimension and area bounds 173 self.dim = int(dimension) 174 assert self.dim > 0, f"dimension count must be int and >0, gotten {dimension}" 175 176 if variable_type_mixed is not None: 177 warnings.warn( 178 f"argument variable_type_mixed is deprecated and will be removed at version 7\n " 179 f"use variable_type={tuple(variable_type_mixed)} instead" 180 ) 181 variable_type = variable_type_mixed 182 self._set_types_indexes(variable_type) # types indexes 183 self._set_var_boundaries(variable_type, variable_boundaries) # input variables' boundaries 184 185 # fix mutation probs 186 187 assert can_be_prob(self.param.mutation_probability) 188 self.prob_mut = self.param.mutation_probability 189 assert self.param.mutation_discrete_probability is None or can_be_prob(self.param.mutation_discrete_probability) 190 self.prob_mut_discrete = self.param.mutation_discrete_probability or self.prob_mut 191 192 if self.param.crossover_probability is not None: 193 warnings.warn( 194 f"crossover_probability is deprecated and will be removed in version 7. " 195 f"Reason: it's old and has no sense" 196 ) 197 198 ############################################################# 199 # input function 200 if function: 201 warnings.warn( 202 f"function is deprecated in init constructor and will be removed in version 7. " 203 f"Move this argument to run() method" 204 ) 205 self._check_function(function) 206 if function_timeout: 207 warnings.warn( 208 f"function_timeout is deprecated in init constructor and will be removed in version 7. " 209 f"Move this argument to run() method" 210 ) 211 self._check_function_timeout(function_timeout) 212 213 ############################################################# 214 215 self.population_size = int(self.param.population_size) 216 self._set_parents_count(self.param.parents_portion) 217 self._set_elit_count(self.population_size, self.param.elit_ratio) 218 assert self.parents_count >= self.elit_count, ( 219 f"\n number of parents ({self.parents_count}) " 220 f"must be greater than number of elits ({self.elit_count})" 221 ) 222 223 self._set_max_iterations() 224 225 self._set_report() 226 227 # specify this function to speed up or change default behaviour 228 self.fill_children: Optional[Callable[[array2D, int], None]] = None 229 """ 230 custom function which adds children for population POP 231 where POP[:parents_count] are parents lines and next lines are for children 232 """ 233 234 def _set_types_indexes(self, variable_type: Union[str, Sequence[str]]): 235 236 indexes = np.arange(self.dim) 237 self.indexes_int = np.array([]) 238 self.indexes_float = np.array([]) 239 240 assert_message = ( 241 f"\n variable_type must be 'bool', 'int', 'real' or a sequence with 'int' and 'real', got {variable_type}" 242 ) 243 244 if isinstance(variable_type, str): 245 assert (variable_type in VARIABLE_TYPE.__args__), assert_message 246 if variable_type == 'real': 247 self.indexes_float = indexes 248 else: 249 self.indexes_int = indexes 250 251 else: # sequence case 252 253 assert len(variable_type) == self.dim, ( 254 f"\n variable_type must have a length equal dimension. " 255 f"Should be {self.dim}, got {len(variable_type)}" 256 ) 257 assert 'bool' not in variable_type, ( 258 "don't use 'bool' if variable_type is a sequence, " 259 "for 'boolean' case use 'int' and specify boundary as (0,1)" 260 ) 261 assert all(v in VARIABLE_TYPE.__args__ for v in variable_type), assert_message 262 263 vartypes = np.array(variable_type) 264 self.indexes_int = indexes[vartypes == 'int'] 265 self.indexes_float = indexes[vartypes == 'real'] 266 267 def _set_var_boundaries( 268 self, 269 variable_type: Union[str, Sequence[str]], 270 variable_boundaries 271 ): 272 if isinstance(variable_type, str) and variable_type == 'bool': 273 self.var_bounds = [(0, 1)] * self.dim 274 else: 275 276 if is_numpy(variable_boundaries): 277 assert variable_boundaries.shape == (self.dim, 2), ( 278 f"\n if variable_boundaries is numpy array, it must be with shape (dim, 2)" 279 ) 280 else: 281 assert len(variable_boundaries) == self.dim and all((len(t) == 2 for t in variable_boundaries)), ( 282 "\n if variable_boundaries is sequence, " 283 "it must be with len dim and boundary for each variable must be a tuple of length two" 284 ) 285 286 for i in variable_boundaries: 287 assert i[0] <= i[1], "\n lower_boundaries must be smaller than upper_boundaries [lower,upper]" 288 289 self.var_bounds = [(i[0], i[1]) for i in variable_boundaries] 290 291 def _set_parents_count(self, parents_portion: float): 292 293 self.parents_count = int(parents_portion * self.population_size) 294 assert self.population_size >= self.parents_count > 1, ( 295 f'parents count {self.parents_count} cannot be less than population size {self.population_size}' 296 ) 297 trl = self.population_size - self.parents_count 298 if trl % 2 != 0: 299 self.parents_count += 1 300 301 def _set_elit_count(self, pop_size: int, elit_ratio: Union[float, int]): 302 303 assert elit_ratio >= 0 304 self.elit_count = elit_ratio if isinstance(elit_ratio, str) else math.ceil(pop_size*elit_ratio) 305 306 def _set_max_iterations(self): 307 308 if self.param.max_num_iteration is None: 309 iterate = 0 310 for i in range(0, self.dim): 311 bound_min, bound_max = self.var_bounds[i] 312 var_space = bound_max - bound_min 313 if i in self.indexes_int: 314 iterate += var_space * self.dim * (100 / self.population_size) 315 else: 316 iterate += var_space * 50 * (100 / self.population_size) 317 iterate = int(iterate) 318 if (iterate * self.population_size) > 10000000: 319 iterate = 10000000 / self.population_size 320 321 self.max_iterations = fast_min(iterate, 8000) 322 else: 323 assert self.param.max_num_iteration > 0 324 self.max_iterations = math.ceil(self.param.max_num_iteration) 325 326 max_it = self.param.max_iteration_without_improv 327 if max_it is None: 328 self.max_stagnations = self.max_iterations + 1 329 else: 330 self.max_stagnations = math.ceil(max_it) 331 332 def _check_function(self, function: Callable[[array1D], float]): 333 assert callable(function), "function must be callable!" 334 self.f = function 335 336 def _check_function_timeout(self, function_timeout: Optional[float]): 337 338 if function_timeout is not None and function_timeout > 0: 339 try: 340 from func_timeout import func_timeout, FunctionTimedOut 341 except ModuleNotFoundError: 342 raise ModuleNotFoundError( 343 "function_timeout > 0 needs additional package func_timeout\n" 344 "run `python -m pip install func_timeout`\n" 345 "or disable this parameter: function_timeout=None" 346 ) 347 348 self.funtimeout = None if function_timeout is None else float(function_timeout) 349 350 #endregion 351 352 #region REPORT 353 354 def _set_report(self): 355 """ 356 creates default report checker 357 """ 358 self.checked_reports = [ 359 # item 0 cuz scores will be sorted and min item is items[0] 360 ('report', operator.itemgetter(0)) 361 ] 362 363 def _clear_report(self): 364 """ 365 removes all report objects 366 """ 367 fields = [f for f in vars(self).keys() if f.startswith('report')] 368 for attr in fields: 369 delattr(self, attr) 370 371 def _init_report(self): 372 """ 373 makes empty report fields 374 """ 375 for name, _ in self.checked_reports: 376 setattr(self, name, []) 377 378 def _update_report(self, scores: array1D): 379 """ 380 append report value to the end of field 381 """ 382 for name, func in self.checked_reports: 383 getattr(self, name).append( 384 func(scores) 385 ) 386 387 #endregion 388 389 #region RUN METHODS 390 391 def _progress(self, count: int, total: int, status: str = ''): 392 393 part = count / total 394 395 filled_len = round(GeneticAlgorithm2.PROGRESS_BAR_LEN * part) 396 percents = round(100.0 * part, 1) 397 bar = '|' * filled_len + '_' * (GeneticAlgorithm2.PROGRESS_BAR_LEN - filled_len) 398 399 self.progress_stream.write('\r%s %s%s %s' % (bar, percents, '%', status)) 400 self.progress_stream.flush() 401 402 def __str__(self): 403 return f"Genetic algorithm object with parameters {self.param}" 404 405 def __repr__(self): 406 return self.__str__() 407 408 def _simulate(self, sample: array1D): 409 410 from func_timeout import func_timeout, FunctionTimedOut 411 412 obj = None 413 eval_time = time.time() 414 try: 415 obj = func_timeout( 416 self.funtimeout, 417 lambda: self.f(sample) 418 ) 419 except FunctionTimedOut: 420 print("given function is not applicable") 421 eval_time = time.time() - eval_time 422 423 assert obj is not None, ( 424 f"the given function was running like {eval_time} seconds and does not provide any output" 425 ) 426 427 tp = type(obj) 428 assert ( 429 tp in (int, float) or np.issubdtype(tp, np.floating) or np.issubdtype(tp, np.integer) 430 ), f"Minimized function should return a number, but got '{obj}' object with type {tp}" 431 432 return obj, eval_time 433 434 def _set_mutation_indexes(self, mutation_indexes: Optional[Iterable[int]]): 435 436 if mutation_indexes is None: 437 self.indexes_float_mut = self.indexes_float 438 self.indexes_int_mut = self.indexes_int 439 else: 440 tmp_indexes = set(mutation_indexes) 441 self.indexes_int_mut = np.array(list(tmp_indexes.intersection(self.indexes_int))) 442 self.indexes_float_mut = np.array(list(tmp_indexes.intersection(self.indexes_float))) 443 444 if self.indexes_float_mut.size == 0 and self.indexes_int_mut.size == 0: 445 warnings.warn(f"No mutation dimensions!!! Check ur mutation indexes!!") 446 447 #@profile 448 def run( 449 self, 450 no_plot: bool = False, 451 disable_printing: bool = False, 452 progress_bar_stream: Optional[str] = 'stdout', 453 454 # deprecated 455 disable_progress_bar: bool = False, 456 457 function: FunctionToMinimize = None, 458 function_timeout: Optional[float] = None, 459 460 set_function: SetFunctionToMinimize = None, 461 apply_function_to_parents: bool = False, 462 start_generation: GenerationConvertible = Generation(), 463 studEA: bool = False, 464 mutation_indexes: Optional[Iterable[int]] = None, 465 466 init_creator: Optional[CreatorFunc] = None, 467 init_oppositors: Optional[Sequence[OppositorFunc]] = None, 468 469 duplicates_oppositor: Optional[OppositorFunc] = None, 470 remove_duplicates_generation_step: Optional[int] = None, 471 472 revolution_oppositor: Optional[OppositorFunc] = None, 473 revolution_after_stagnation_step: Optional[int] = None, 474 revolution_part: float = 0.3, 475 476 population_initializer: Tuple[ 477 int, PopulationModifier 478 ] = get_population_initializer(select_best_of=1, local_optimization_step='never', local_optimizer=None), 479 480 stop_when_reached: Optional[float] = None, 481 callbacks: Optional[Sequence[SimpleCallbackFunc]] = None, 482 middle_callbacks: Optional[Sequence[MiddleCallbackFunc]] = None, #+ 483 time_limit_secs: Optional[float] = None, 484 save_last_generation_as: Optional[str] = None, 485 seed: Optional[int] = None 486 ) -> GAResult: 487 """ 488 runs optimization process 489 490 Args: 491 no_plot: do not plot results using matplotlib by default 492 493 disable_printing: do not print log info of optimization process (except progress bar) 494 495 progress_bar_stream: 'stdout', 'stderr' or None to disable progress bar; 496 disabling progress bar can speed up the optimization process in many cases 497 498 disable_progress_bar: deprecated 499 500 function: the given objective function (sample -> its score) to be minimized; 501 502 function_timeout: if the given function does not provide 503 output before function_timeout (unit is seconds) the algorithm raises error. 504 For example, when there is an infinite loop in the given function. 505 `None` means disabling 506 507 set_function: set function (all samples -> score per sample) to be used instead of usual function 508 (usually for optimization purposes) 509 510 apply_function_to_parents: whether to apply function to parents from previous generation (if it's needed), 511 it can be needed at working with games agents, but for other tasks will just waste time 512 513 start_generation: initial generation object of any `GenerationConvertible` type 514 515 studEA: using stud EA strategy (crossover with best object always) 516 517 mutation_indexes: indexes of dimensions where mutation can be performed (all dimensions by default) 518 519 init_creator: the function creates population samples. 520 By default -- random uniform for real variables and random uniform for int 521 init_oppositors: the list of oppositors creates oppositions for base population. No by default 522 523 duplicates_oppositor: oppositor for applying after duplicates removing. 524 By default -- using just random initializer from creator 525 remove_duplicates_generation_step: step for removing duplicates (have a sense with discrete tasks). 526 No by default 527 528 revolution_oppositor: oppositor for revolution time. No by default 529 revolution_after_stagnation_step: create revolution after this generations of stagnation. No by default 530 revolution_part: float, the part of generation to being oppose. By default is 0.3 531 532 population_initializer: object for actions at population initialization step 533 to create better start population. See doc 534 535 stop_when_reached: stop searching after reaching this value 536 (it can be potential minimum or something else) 537 538 callbacks: sequence of callback functions with structure: 539 (generation_number, report_list, last_population, last_scores) -> do some action 540 541 middle_callbacks: sequence of functions made by `MiddleCallback` class 542 543 time_limit_secs: limit time of working (in seconds); 544 `None` means no time limit (limit will be only for count of generation and so on) 545 546 save_last_generation_as: path to .npz file 547 for saving last_generation as numpy dictionary like 548 {'population': 2D-array, 'scores': 1D-array}; 549 None disables this option 550 551 seed: random seed (None if doesn't matter) 552 553 Returns: 554 `GAResult` object; 555 also fills the self.report and self.result with many report information 556 557 Notes: 558 - if `function_timeout` is enabled then `function` must be set 559 - it would be more logical to use params like `studEA` as an algorithm parameter, 560 but `run()`-way can be more convenient for real using 561 """ 562 563 if disable_progress_bar: 564 warnings.warn( 565 f"disable_progress_bar is deprecated and will be removed in version 7, " 566 f"use probress_bar_stream=None to disable progress bar" 567 ) 568 progress_bar_stream = None 569 570 enable_printing: bool = not disable_printing 571 572 start_generation = Generation.from_object(self.dim, start_generation) 573 574 assert is_current_gen_number(revolution_after_stagnation_step), "must be None or int and >0" 575 assert is_current_gen_number(remove_duplicates_generation_step), "must be None or int and >0" 576 assert can_be_prob(revolution_part), f"revolution_part must be in [0,1], not {revolution_part}" 577 assert stop_when_reached is None or isinstance(stop_when_reached, (int, float)) 578 assert isinstance(callbacks, collections.abc.Sequence) or callbacks is None, ( 579 "callbacks should be a list of callbacks functions" 580 ) 581 assert isinstance(middle_callbacks, collections.abc.Sequence) or middle_callbacks is None, ( 582 "middle_callbacks should be list of MiddleCallbacks functions" 583 ) 584 assert time_limit_secs is None or time_limit_secs > 0, 'time_limit_secs must be None of number > 0' 585 586 self._set_mutation_indexes(mutation_indexes) 587 from OppOpPopInit import set_seed 588 set_seed(seed) 589 590 # randomstate = np.random.default_rng(random.randint(0, 1000) if seed is None else seed) 591 # self.randomstate = randomstate 592 593 # using bool flag is faster than using empty function with generated string arguments 594 SHOW_PROGRESS = progress_bar_stream is not None 595 if SHOW_PROGRESS: 596 597 show_progress = lambda t, t2, s: self._progress(t, t2, status=s) 598 599 if progress_bar_stream == 'stdout': 600 self.progress_stream = sys.stdout 601 elif progress_bar_stream == 'stderr': 602 self.progress_stream = sys.stderr 603 else: 604 raise Exception( 605 f"wrong value {progress_bar_stream} of progress_bar_stream, must be 'stdout'/'stderr'/None" 606 ) 607 else: 608 show_progress = None 609 610 stop_by_val = ( 611 (lambda best_f: False) 612 if stop_when_reached is None 613 else (lambda best_f: best_f <= stop_when_reached) 614 ) 615 616 t: int = 0 617 count_stagnation: int = 0 618 pop: array2D = None 619 scores: array1D = None 620 621 #region CALLBACKS 622 623 def get_data(): 624 """ 625 returns all important data about model 626 """ 627 return MiddleCallbackData( 628 last_generation=Generation(pop, scores), 629 current_generation=t, 630 report_list=self.report, 631 632 mutation_prob=self.prob_mut, 633 mutation_discrete_prob=self.prob_mut_discrete, 634 mutation=self.real_mutation, 635 mutation_discrete=self.discrete_mutation, 636 crossover=self.crossover, 637 selection=self.selection, 638 639 current_stagnation=count_stagnation, 640 max_stagnation=self.max_stagnations, 641 642 parents_portion=self.param.parents_portion, 643 elit_ratio=self.param.elit_ratio, 644 645 set_function=self.set_function, 646 647 reason_to_stop=reason_to_stop 648 ) 649 650 def set_data(data: MiddleCallbackData): 651 """ 652 sets data to model 653 """ 654 nonlocal pop, scores, count_stagnation, reason_to_stop 655 656 pop, scores = data.last_generation.variables, data.last_generation.scores 657 self.population_size = pop.shape[0] 658 659 self.param.parents_portion = data.parents_portion 660 self._set_parents_count(data.parents_portion) 661 662 self.param.elit_ratio = data.elit_ratio 663 self._set_elit_count(self.population_size, data.elit_ratio) 664 665 self.prob_mut = data.mutation_prob 666 self.prob_mut_discrete = data.mutation_discrete_prob 667 self.real_mutation = data.mutation 668 self.discrete_mutation = data.mutation_discrete 669 self.crossover = data.crossover 670 self.selection = data.selection 671 672 count_stagnation = data.current_stagnation 673 reason_to_stop = data.reason_to_stop 674 self.max_stagnations = data.max_stagnation 675 676 self.set_function = data.set_function 677 678 if not callbacks: 679 total_callback = lambda g, r, lp, ls: None 680 else: 681 def total_callback( 682 generation_number: int, 683 report_list: List[float], 684 last_population: array2D, 685 last_scores: array1D 686 ): 687 for cb in callbacks: 688 cb(generation_number, report_list, last_population, last_scores) 689 690 if not middle_callbacks: 691 total_middle_callback = lambda: None 692 else: 693 def total_middle_callback(): 694 """ 695 applies callbacks and sets new data if there is a sense 696 """ 697 data = get_data() 698 flag = False 699 for cb in middle_callbacks: 700 data, has_sense = cb(data) 701 if has_sense: 702 flag = True 703 if flag: 704 set_data(data) # update global date if there was real callback step 705 706 #endregion 707 708 start_time = time.time() 709 time_is_done = ( 710 (lambda: False) 711 if time_limit_secs is None 712 else (lambda: int(time.time() - start_time) >= time_limit_secs) 713 ) 714 715 # combine with deprecated parts 716 function = function or self.f 717 function_timeout = function_timeout or self.funtimeout 718 719 assert function or set_function, 'no function to minimize' 720 if function: 721 self._check_function(function) 722 if function_timeout: 723 self._check_function_timeout(function_timeout) 724 725 self.set_function = set_function or GeneticAlgorithm2.default_set_function(self.f) 726 727 #region Initial population, duplicates filter, revolutionary 728 729 pop_coef, initializer_func = population_initializer 730 731 # population creator by random or with oppositions 732 if init_creator is None: 733 734 from OppOpPopInit import SampleInitializers 735 736 # just uniform random 737 self.creator = SampleInitializers.Combined( 738 minimums=[v[0] for v in self.var_bounds], 739 maximums=[v[1] for v in self.var_bounds], 740 indexes=( 741 self.indexes_int, 742 self.indexes_float 743 ), 744 creator_initializers=( 745 SampleInitializers.RandomInteger, 746 SampleInitializers.Uniform 747 ) 748 ) 749 else: 750 assert callable(init_creator) 751 self.creator = init_creator 752 753 self.init_oppositors = init_oppositors 754 self.dup_oppositor = duplicates_oppositor 755 self.revolution_oppositor = revolution_oppositor 756 757 # event for removing duplicates 758 if remove_duplicates_generation_step is None: 759 def remover(pop: array2D, scores: array1D, gen: int) -> Tuple[ 760 array2D, 761 array1D 762 ]: 763 return pop, scores 764 else: 765 766 def without_dup(pop: array2D, scores: array1D): # returns population without dups 767 _, index_of_dups = np.unique(pop, axis=0, return_index=True) 768 return pop[index_of_dups], scores[index_of_dups], scores.size - index_of_dups.size 769 770 if self.dup_oppositor is None: # if there is no dup_oppositor, use random creator 771 def remover(pop: array2D, scores: array1D, gen: int) -> Tuple[ 772 array2D, 773 array1D 774 ]: 775 if gen % remove_duplicates_generation_step != 0: 776 return pop, scores 777 778 pp, sc, count_to_create = without_dup(pop, scores) # pop without dups 779 780 if count_to_create == 0: 781 if SHOW_PROGRESS: 782 show_progress(t, self.max_iterations, 783 f"GA is running...{t} gen from {self.max_iterations}. No dups!") 784 return pop, scores 785 786 pp2 = SampleInitializers.CreateSamples(self.creator, count_to_create) # new pop elements 787 pp2_scores = self.set_function(pp2) # new elements values 788 789 if SHOW_PROGRESS: 790 show_progress(t, self.max_iterations, 791 f"GA is running...{t} gen from {self.max_iterations}. Kill dups!") 792 793 new_pop = np.vstack((pp, pp2)) 794 new_scores = np.concatenate((sc, pp2_scores)) 795 796 args_to_sort = new_scores.argsort() 797 return new_pop[args_to_sort], new_scores[args_to_sort] 798 799 else: # using oppositors 800 assert callable(self.dup_oppositor) 801 802 def remover(pop: np.ndarray, scores: np.ndarray, gen: int) -> Tuple[ 803 np.ndarray, 804 np.ndarray 805 ]: 806 if gen % remove_duplicates_generation_step != 0: 807 return pop, scores 808 809 pp, sc, count_to_create = without_dup(pop, scores) # pop without dups 810 811 if count_to_create == 0: 812 if SHOW_PROGRESS: 813 show_progress(t, self.max_iterations, 814 f"GA is running...{t} gen from {self.max_iterations}. No dups!") 815 return pop, scores 816 817 if count_to_create > sc.size: 818 raise Exception( 819 f"Too many duplicates at generation {gen} ({count_to_create} > {sc.size}), cannot oppose" 820 ) 821 822 # oppose count_to_create worse elements 823 pp2 = OppositionOperators.Reflect(pp[-count_to_create:], self.dup_oppositor) # new pop elements 824 pp2_scores = self.set_function(pp2) # new elements values 825 826 if SHOW_PROGRESS: 827 show_progress(t, self.max_iterations, 828 f"GA is running...{t} gen from {self.max_iterations}. Kill dups!") 829 830 new_pop = np.vstack((pp, pp2)) 831 new_scores = np.concatenate((sc, pp2_scores)) 832 833 args_to_sort = new_scores.argsort() 834 return new_pop[args_to_sort], new_scores[args_to_sort] 835 836 # event for revolution 837 if revolution_after_stagnation_step is None: 838 def revolution(pop: array2D, scores: array1D, stagnation_count: int) -> Tuple[ 839 array2D, 840 array1D 841 ]: 842 return pop, scores 843 else: 844 if revolution_oppositor is None: 845 raise Exception( 846 f"How can I make revolution each {revolution_after_stagnation_step} stagnation steps " 847 f"if revolution_oppositor is None (not defined)?" 848 ) 849 assert callable(revolution_oppositor) 850 851 from OppOpPopInit import OppositionOperators 852 853 def revolution(pop: array2D, scores: array1D, stagnation_count: int) -> Tuple[ 854 array2D, 855 array1D 856 ]: 857 if stagnation_count < revolution_after_stagnation_step: 858 return pop, scores 859 part = int(pop.shape[0]*revolution_part) 860 861 pp2 = OppositionOperators.Reflect(pop[-part:], self.revolution_oppositor) 862 pp2_scores = self.set_function(pp2) 863 864 combined = np.vstack((pop, pp2)) 865 combined_scores = np.concatenate((scores, pp2_scores)) 866 args = combined_scores.argsort() 867 868 if SHOW_PROGRESS: 869 show_progress(t, self.max_iterations, 870 f"GA is running...{t} gen from {self.max_iterations}. Revolution!") 871 872 args = args[:scores.size] 873 return combined[args], combined_scores[args] 874 875 #enregion 876 877 # 878 # 879 # START ALGORITHM LOGIC 880 # 881 # 882 883 # Report 884 self._clear_report() # clear old report objects 885 self._init_report() 886 887 # initialization of pop 888 889 if start_generation.variables is None: 890 891 from OppOpPopInit import init_population 892 893 real_pop_size = self.population_size * pop_coef 894 895 # pop = np.empty((real_pop_size, self.dim)) 896 scores = np.empty(real_pop_size) 897 898 pop = init_population( 899 samples_count=real_pop_size, 900 creator=self.creator, 901 oppositors=self.init_oppositors 902 ) 903 904 if self.funtimeout and self.funtimeout > 0: # perform simulation 905 906 time_counter = 0 907 908 for p in range(0, real_pop_size): 909 # simulation returns exception or func value -- check the time of evaluating 910 value, eval_time = self._simulate(pop[p]) 911 scores[p] = value 912 time_counter += eval_time 913 914 if enable_printing: 915 print( 916 f"\nSim: Average time of function evaluating (secs): " 917 f"{time_counter/real_pop_size} (total = {time_counter})\n" 918 ) 919 else: 920 921 eval_time = time.time() 922 scores = self.set_function(pop) 923 eval_time = time.time() - eval_time 924 if enable_printing: 925 print( 926 f"\nSet: Average time of function evaluating (secs): " 927 f"{eval_time/real_pop_size} (total = {eval_time})\n" 928 ) 929 930 else: 931 932 self.population_size = start_generation.variables.shape[0] 933 self._set_elit_count(self.population_size, self.param.elit_ratio) 934 self._set_parents_count(self.param.parents_portion) 935 936 pop = start_generation.variables 937 938 if start_generation.scores is None: 939 940 _time = time.time() 941 scores = self.set_function(pop) 942 _time = time.time() - _time 943 944 if enable_printing: 945 print( 946 f'\nFirst scores are made from gotten variables ' 947 f'(by {_time} secs, about {_time/scores.size} for each creature)\n' 948 ) 949 else: 950 scores = start_generation.scores 951 if enable_printing: 952 print(f"\nFirst scores are from gotten population\n") 953 954 # Initialization by select bests and local_descent 955 956 pop, scores = initializer_func(pop, scores) 957 958 # first sort 959 args_to_sort = scores.argsort() 960 pop = pop[args_to_sort] 961 scores = scores[args_to_sort] 962 self._update_report(scores) 963 964 self.population_size = scores.size 965 self.best_function = scores[0] 966 967 if enable_printing: 968 print( 969 f"Best score before optimization: {self.best_function}" 970 ) 971 972 t: int = 1 973 count_stagnation: int = 0 974 """iterations without progress""" 975 reason_to_stop: Optional[str] = None 976 977 # gets indexes of 2 parents to crossover 978 if studEA: 979 get_parents_inds = lambda: (0, random.randrange(1, self.parents_count)) 980 else: 981 get_parents_inds = lambda: random_indexes_pair(self.parents_count) 982 983 # while no reason to stop 984 while True: 985 986 if count_stagnation > self.max_stagnations: 987 reason_to_stop = f"limit of fails: {count_stagnation}" 988 elif t == self.max_iterations: 989 reason_to_stop = f'limit of iterations: {t}' 990 elif stop_by_val(self.best_function): 991 reason_to_stop = f"stop value reached: {self.best_function} <= {stop_when_reached}" 992 elif time_is_done(): 993 reason_to_stop = f'time is done: {time.time() - start_time} >= {time_limit_secs}' 994 995 if reason_to_stop is not None: 996 if SHOW_PROGRESS: 997 show_progress(t, self.max_iterations, f"GA is running... STOP! {reason_to_stop}") 998 break 999 1000 if scores[0] < self.best_function: # if there is progress 1001 count_stagnation = 0 1002 self.best_function = scores[0] 1003 else: 1004 count_stagnation += 1 1005 1006 if SHOW_PROGRESS: 1007 show_progress( 1008 t, 1009 self.max_iterations, 1010 f"GA is running...{t} gen from {self.max_iterations}...best value = {self.best_function}" 1011 ) 1012 1013 # Select parents 1014 1015 par: array2D = np.empty((self.parents_count, self.dim)) 1016 """samples chosen to create new samples""" 1017 par_scores: array1D = np.empty(self.parents_count) 1018 1019 elit_slice = slice(None, self.elit_count) 1020 """elit parents""" 1021 1022 # copy needs because the generation will be removed after parents selection 1023 par[elit_slice] = pop[elit_slice].copy() 1024 par_scores[elit_slice] = scores[elit_slice].copy() 1025 1026 new_par_inds = ( 1027 self.selection( 1028 scores[self.elit_count:], 1029 self.parents_count - self.elit_count 1030 ).astype(np.int16) + self.elit_count 1031 ) 1032 """non-elit parents indexes""" 1033 #new_par_inds = self.selection(scores, self.parents_count - self.elit_count).astype(np.int16) 1034 par_slice = slice(self.elit_count, self.parents_count) 1035 par[par_slice] = pop[new_par_inds].copy() 1036 par_scores[par_slice] = scores[new_par_inds].copy() 1037 1038 pop = np.empty((self.population_size, self.dim)) 1039 """new generation""" 1040 scores = np.empty(self.population_size) 1041 """new generation scores""" 1042 1043 parents_slice = slice(None, self.parents_count) 1044 pop[parents_slice] = par 1045 scores[parents_slice] = par_scores 1046 1047 if self.fill_children is None: # default fill children behaviour 1048 DO_MUTATION = self.needs_mutation 1049 for k in range(self.parents_count, self.population_size, 2): 1050 1051 r1, r2 = get_parents_inds() 1052 1053 pvar1 = pop[r1] # equal to par[r1], but better for cache 1054 pvar2 = pop[r2] 1055 1056 ch1, ch2 = self.crossover(pvar1, pvar2) 1057 1058 if DO_MUTATION: 1059 ch1 = self.mut(ch1) 1060 ch2 = self.mut_middle(ch2, pvar1, pvar2) 1061 1062 pop[k] = ch1 1063 pop[k+1] = ch2 1064 else: # custom behaviour 1065 self.fill_children(pop, self.parents_count) 1066 1067 if apply_function_to_parents: 1068 scores = self.set_function(pop) 1069 else: 1070 scores[self.parents_count:] = self.set_function(pop[self.parents_count:]) 1071 1072 # remove duplicates 1073 pop, scores = remover(pop, scores, t) 1074 # revolution 1075 pop, scores = revolution(pop, scores, count_stagnation) 1076 1077 # make population better 1078 args_to_sort = scores.argsort() 1079 pop = pop[args_to_sort] 1080 scores = scores[args_to_sort] 1081 self._update_report(scores) 1082 1083 # callback it 1084 total_callback(t, self.report, pop, scores) 1085 total_middle_callback() 1086 1087 t += 1 1088 1089 self.best_function = fast_min(scores[0], self.best_function) 1090 1091 last_generation = Generation(pop, scores) 1092 self.result = GAResult(last_generation) 1093 1094 if save_last_generation_as is not None: 1095 last_generation.save(save_last_generation_as) 1096 1097 if enable_printing: 1098 show = ' ' * 200 1099 sys.stdout.write( 1100 f'\r{show}\n' 1101 f'\r The best found solution:\n {pop[0]}' 1102 f'\n\n Objective function:\n {self.best_function}\n' 1103 f'\n Used generations: {len(self.report)}' 1104 f'\n Used time: {time.time() - start_time:.3g} seconds\n' 1105 ) 1106 sys.stdout.flush() 1107 1108 if not no_plot: 1109 self.plot_results() 1110 1111 if enable_printing: 1112 if reason_to_stop is not None and 'iterations' not in reason_to_stop: 1113 sys.stdout.write( 1114 f'\nWarning: GA is terminated because of {reason_to_stop}' 1115 ) 1116 1117 return self.result 1118 1119 #endregion 1120 1121 #region PLOTTING 1122 1123 def plot_results( 1124 self, 1125 title: str = 'Genetic Algorithm', 1126 save_as: Optional[str] = None, 1127 dpi: int = 200, 1128 main_color: str = 'blue' 1129 ): 1130 """ 1131 Simple plot of self.report (if not empty) 1132 """ 1133 if len(self.report) == 0: 1134 sys.stdout.write("No results to plot!\n") 1135 return 1136 1137 plot_several_lines( 1138 lines=[self.report], 1139 colors=[main_color], 1140 labels=['best of generation'], 1141 linewidths=[2], 1142 title=title, 1143 xlabel='Generation', 1144 ylabel='Minimized function', 1145 save_as=save_as, 1146 dpi=dpi 1147 ) 1148 1149 def plot_generation_scores(self, title: str = 'Last generation scores', save_as: Optional[str] = None): 1150 """ 1151 Plots barplot of scores of last population 1152 """ 1153 1154 if not hasattr(self, 'result'): 1155 raise Exception( 1156 "There is no 'result' field into ga object! Before plotting generation u need to run seaching process" 1157 ) 1158 1159 plot_pop_scores(self.result.last_generation.scores, title, save_as) 1160 1161 #endregion 1162 1163 #region MUTATION 1164 def mut(self, x: array1D): 1165 """ 1166 just mutation 1167 """ 1168 1169 for i in self.indexes_int_mut: 1170 if random.random() < self.prob_mut_discrete: 1171 x[i] = self.discrete_mutation(x[i], *self.var_bounds[i]) 1172 1173 for i in self.indexes_float_mut: 1174 if random.random() < self.prob_mut: 1175 x[i] = self.real_mutation(x[i], *self.var_bounds[i]) 1176 1177 return x 1178 1179 def mut_middle(self, x: array1D, p1: array1D, p2: array1D): 1180 """ 1181 mutation oriented on parents 1182 """ 1183 for i in self.indexes_int_mut: 1184 1185 if random.random() < self.prob_mut_discrete: 1186 v1, v2 = p1[i], p2[i] 1187 if v1 < v2: 1188 x[i] = random.randint(v1, v2) 1189 elif v1 > v2: 1190 x[i] = random.randint(v2, v1) 1191 else: 1192 x[i] = random.randint(*self.var_bounds[i]) 1193 1194 for i in self.indexes_float_mut: 1195 if random.random() < self.prob_mut: 1196 v1, v2 = p1[i], p2[i] 1197 if v1 != v2: 1198 x[i] = random.uniform(v1, v2) 1199 else: 1200 x[i] = random.uniform(*self.var_bounds[i]) 1201 return x 1202 1203 #endregion 1204 1205 #region Set functions 1206 1207 @staticmethod 1208 def default_set_function(function_for_set: Callable[[array1D], float]): 1209 """ 1210 simple function for creating set_function 1211 function_for_set just applies to each row of population 1212 """ 1213 def func(matrix: array2D): 1214 return np.array( 1215 [function_for_set(row) for row in matrix] 1216 ) 1217 return func 1218 1219 @staticmethod 1220 def vectorized_set_function(function_for_set: Callable[[array1D], float]): 1221 """ 1222 works like default, but faster for big populations and slower for little 1223 function_for_set just applyes to each row of population 1224 """ 1225 1226 func = np.vectorize(function_for_set, signature='(n)->()') 1227 return func 1228 1229 @staticmethod 1230 def set_function_multiprocess(function_for_set: Callable[[array1D], float], n_jobs: int = -1): 1231 """ 1232 like function_for_set but uses joblib with n_jobs (-1 goes to count of available processors) 1233 """ 1234 try: 1235 from joblib import Parallel, delayed 1236 except ModuleNotFoundError: 1237 raise ModuleNotFoundError( 1238 "this additional feature requires joblib package," 1239 "run `pip install joblib` or `pip install --upgrade geneticalgorithm2[full]`" 1240 "or use another set function" 1241 ) 1242 1243 def func(matrix: array2D): 1244 result = Parallel(n_jobs=n_jobs)(delayed(function_for_set)(matrix[i]) for i in range(matrix.shape[0])) 1245 return np.array(result) 1246 return func 1247 1248 #endregion
Genetic algorithm optimization process
79 def __init__( 80 self, 81 function: FunctionToMinimize = None, 82 83 dimension: int = 0, 84 variable_type: Union[VARIABLE_TYPE, Sequence[VARIABLE_TYPE]] = 'bool', 85 variable_boundaries: Optional[Union[array2D, Sequence[Tuple[float, float]]]] = None, 86 87 variable_type_mixed=None, 88 89 function_timeout: Optional[float] = None, 90 algorithm_parameters: Union[AlgorithmParams, Dict[str, Any]] = default_params 91 ): 92 """ 93 initializes the GA object and performs main checks 94 95 Args: 96 function: the given objective function to be minimized -- deprecated and moved to run() method 97 dimension: the number of decision variables, the population samples dimension 98 99 variable_type: string means the variable type for all variables, 100 for mixed types use sequence of strings of type for each variable 101 102 variable_boundaries: leave it None if variable_type is 'bool'; 103 otherwise provide a sequence of tuples of length two as boundaries for each variable; 104 the length of the array must be equal dimension. 105 For example, ([0,100], [0,200]) determines 106 lower boundary 0 and upper boundary 100 for first 107 and upper boundary 200 for second variable 108 and dimension must be 2. 109 110 variable_type_mixed -- deprecated 111 112 function_timeout: if the given function does not provide 113 output before function_timeout (unit is seconds) the algorithm raises error. 114 For example, when there is an infinite loop in the given function. 115 `None` means disabling -- deprecated and moved to run() 116 117 algorithm_parameters: AlgorithmParams object or usual dictionary with algorithm parameter; 118 it is not mandatory to provide all possible parameters 119 120 Notes: 121 - This implementation minimizes the given objective function. 122 For maximization u can multiply the function by -1 (for instance): the absolute 123 value of the output would be the actual objective function 124 125 for more details and examples of implementation please visit: 126 https://github.com/PasaOpasen/geneticalgorithm2 127 128 """ 129 130 # all default fields 131 132 # self.crossover: Callable[[np.ndarray, np.ndarray], Tuple[np.ndarray, np.ndarray]] = None 133 # self.real_mutation: Callable[[float, float, float], float] = None 134 # self.discrete_mutation: Callable[[int, int, int], int] = None 135 # self.selection: Callable[[np.ndarray, int], np.ndarray] = None 136 137 self.result = None 138 self.best_function = None 139 140 self.revolution_oppositor = None 141 self.dup_oppositor = None 142 self.creator = None 143 self.init_oppositors = None 144 145 self.f: Callable[[array1D], float] = None 146 self.funtimeout: float = None 147 148 self.set_function: Callable[[np.ndarray], np.ndarray] = None 149 150 # self.dim: int = None 151 self.var_bounds: List[Tuple[Union[int, float], Union[int, float]]] = None 152 # self.indexes_int: np.ndarray = None 153 # self.indexes_float: np.ndarray = None 154 155 self.checked_reports: List[Tuple[str, Callable[[array1D], None]]] = None 156 157 self.population_size: int = None 158 self.progress_stream = None 159 160 # input algorithm's parameters 161 162 assert isinstance(algorithm_parameters, (dict, AlgorithmParams)), ( 163 "algorithm_parameters must be dict or AlgorithmParams object" 164 ) 165 if not isinstance(algorithm_parameters, AlgorithmParams): 166 algorithm_parameters = AlgorithmParams.from_dict(algorithm_parameters) 167 algorithm_parameters.validate() 168 self.param = algorithm_parameters 169 170 self.crossover, self.real_mutation, self.discrete_mutation, self.selection = algorithm_parameters.get_CMS_funcs() 171 172 # dimension and area bounds 173 self.dim = int(dimension) 174 assert self.dim > 0, f"dimension count must be int and >0, gotten {dimension}" 175 176 if variable_type_mixed is not None: 177 warnings.warn( 178 f"argument variable_type_mixed is deprecated and will be removed at version 7\n " 179 f"use variable_type={tuple(variable_type_mixed)} instead" 180 ) 181 variable_type = variable_type_mixed 182 self._set_types_indexes(variable_type) # types indexes 183 self._set_var_boundaries(variable_type, variable_boundaries) # input variables' boundaries 184 185 # fix mutation probs 186 187 assert can_be_prob(self.param.mutation_probability) 188 self.prob_mut = self.param.mutation_probability 189 assert self.param.mutation_discrete_probability is None or can_be_prob(self.param.mutation_discrete_probability) 190 self.prob_mut_discrete = self.param.mutation_discrete_probability or self.prob_mut 191 192 if self.param.crossover_probability is not None: 193 warnings.warn( 194 f"crossover_probability is deprecated and will be removed in version 7. " 195 f"Reason: it's old and has no sense" 196 ) 197 198 ############################################################# 199 # input function 200 if function: 201 warnings.warn( 202 f"function is deprecated in init constructor and will be removed in version 7. " 203 f"Move this argument to run() method" 204 ) 205 self._check_function(function) 206 if function_timeout: 207 warnings.warn( 208 f"function_timeout is deprecated in init constructor and will be removed in version 7. " 209 f"Move this argument to run() method" 210 ) 211 self._check_function_timeout(function_timeout) 212 213 ############################################################# 214 215 self.population_size = int(self.param.population_size) 216 self._set_parents_count(self.param.parents_portion) 217 self._set_elit_count(self.population_size, self.param.elit_ratio) 218 assert self.parents_count >= self.elit_count, ( 219 f"\n number of parents ({self.parents_count}) " 220 f"must be greater than number of elits ({self.elit_count})" 221 ) 222 223 self._set_max_iterations() 224 225 self._set_report() 226 227 # specify this function to speed up or change default behaviour 228 self.fill_children: Optional[Callable[[array2D, int], None]] = None 229 """ 230 custom function which adds children for population POP 231 where POP[:parents_count] are parents lines and next lines are for children 232 """
initializes the GA object and performs main checks
Arguments:
- function: the given objective function to be minimized -- deprecated and moved to run() method
- dimension: the number of decision variables, the population samples dimension
- variable_type: string means the variable type for all variables, for mixed types use sequence of strings of type for each variable
- variable_boundaries: leave it None if variable_type is 'bool'; otherwise provide a sequence of tuples of length two as boundaries for each variable; the length of the array must be equal dimension. For example, ([0,100], [0,200]) determines lower boundary 0 and upper boundary 100 for first and upper boundary 200 for second variable and dimension must be 2.
- variable_type_mixed -- deprecated
- function_timeout: if the given function does not provide
output before function_timeout (unit is seconds) the algorithm raises error.
For example, when there is an infinite loop in the given function.
None
means disabling -- deprecated and moved to run() - algorithm_parameters: AlgorithmParams object or usual dictionary with algorithm parameter; it is not mandatory to provide all possible parameters
Notes:
- This implementation minimizes the given objective function. For maximization u can multiply the function by -1 (for instance): the absolute value of the output would be the actual objective function
for more details and examples of implementation please visit: https://github.com/PasaOpasen/geneticalgorithm2
72 @property 73 def needs_mutation(self) -> bool: 74 """whether the mutation is required""" 75 return self.prob_mut > 0 or self.prob_mut_discrete > 0
whether the mutation is required
custom function which adds children for population POP where POP[:parents_count] are parents lines and next lines are for children
448 def run( 449 self, 450 no_plot: bool = False, 451 disable_printing: bool = False, 452 progress_bar_stream: Optional[str] = 'stdout', 453 454 # deprecated 455 disable_progress_bar: bool = False, 456 457 function: FunctionToMinimize = None, 458 function_timeout: Optional[float] = None, 459 460 set_function: SetFunctionToMinimize = None, 461 apply_function_to_parents: bool = False, 462 start_generation: GenerationConvertible = Generation(), 463 studEA: bool = False, 464 mutation_indexes: Optional[Iterable[int]] = None, 465 466 init_creator: Optional[CreatorFunc] = None, 467 init_oppositors: Optional[Sequence[OppositorFunc]] = None, 468 469 duplicates_oppositor: Optional[OppositorFunc] = None, 470 remove_duplicates_generation_step: Optional[int] = None, 471 472 revolution_oppositor: Optional[OppositorFunc] = None, 473 revolution_after_stagnation_step: Optional[int] = None, 474 revolution_part: float = 0.3, 475 476 population_initializer: Tuple[ 477 int, PopulationModifier 478 ] = get_population_initializer(select_best_of=1, local_optimization_step='never', local_optimizer=None), 479 480 stop_when_reached: Optional[float] = None, 481 callbacks: Optional[Sequence[SimpleCallbackFunc]] = None, 482 middle_callbacks: Optional[Sequence[MiddleCallbackFunc]] = None, #+ 483 time_limit_secs: Optional[float] = None, 484 save_last_generation_as: Optional[str] = None, 485 seed: Optional[int] = None 486 ) -> GAResult: 487 """ 488 runs optimization process 489 490 Args: 491 no_plot: do not plot results using matplotlib by default 492 493 disable_printing: do not print log info of optimization process (except progress bar) 494 495 progress_bar_stream: 'stdout', 'stderr' or None to disable progress bar; 496 disabling progress bar can speed up the optimization process in many cases 497 498 disable_progress_bar: deprecated 499 500 function: the given objective function (sample -> its score) to be minimized; 501 502 function_timeout: if the given function does not provide 503 output before function_timeout (unit is seconds) the algorithm raises error. 504 For example, when there is an infinite loop in the given function. 505 `None` means disabling 506 507 set_function: set function (all samples -> score per sample) to be used instead of usual function 508 (usually for optimization purposes) 509 510 apply_function_to_parents: whether to apply function to parents from previous generation (if it's needed), 511 it can be needed at working with games agents, but for other tasks will just waste time 512 513 start_generation: initial generation object of any `GenerationConvertible` type 514 515 studEA: using stud EA strategy (crossover with best object always) 516 517 mutation_indexes: indexes of dimensions where mutation can be performed (all dimensions by default) 518 519 init_creator: the function creates population samples. 520 By default -- random uniform for real variables and random uniform for int 521 init_oppositors: the list of oppositors creates oppositions for base population. No by default 522 523 duplicates_oppositor: oppositor for applying after duplicates removing. 524 By default -- using just random initializer from creator 525 remove_duplicates_generation_step: step for removing duplicates (have a sense with discrete tasks). 526 No by default 527 528 revolution_oppositor: oppositor for revolution time. No by default 529 revolution_after_stagnation_step: create revolution after this generations of stagnation. No by default 530 revolution_part: float, the part of generation to being oppose. By default is 0.3 531 532 population_initializer: object for actions at population initialization step 533 to create better start population. See doc 534 535 stop_when_reached: stop searching after reaching this value 536 (it can be potential minimum or something else) 537 538 callbacks: sequence of callback functions with structure: 539 (generation_number, report_list, last_population, last_scores) -> do some action 540 541 middle_callbacks: sequence of functions made by `MiddleCallback` class 542 543 time_limit_secs: limit time of working (in seconds); 544 `None` means no time limit (limit will be only for count of generation and so on) 545 546 save_last_generation_as: path to .npz file 547 for saving last_generation as numpy dictionary like 548 {'population': 2D-array, 'scores': 1D-array}; 549 None disables this option 550 551 seed: random seed (None if doesn't matter) 552 553 Returns: 554 `GAResult` object; 555 also fills the self.report and self.result with many report information 556 557 Notes: 558 - if `function_timeout` is enabled then `function` must be set 559 - it would be more logical to use params like `studEA` as an algorithm parameter, 560 but `run()`-way can be more convenient for real using 561 """ 562 563 if disable_progress_bar: 564 warnings.warn( 565 f"disable_progress_bar is deprecated and will be removed in version 7, " 566 f"use probress_bar_stream=None to disable progress bar" 567 ) 568 progress_bar_stream = None 569 570 enable_printing: bool = not disable_printing 571 572 start_generation = Generation.from_object(self.dim, start_generation) 573 574 assert is_current_gen_number(revolution_after_stagnation_step), "must be None or int and >0" 575 assert is_current_gen_number(remove_duplicates_generation_step), "must be None or int and >0" 576 assert can_be_prob(revolution_part), f"revolution_part must be in [0,1], not {revolution_part}" 577 assert stop_when_reached is None or isinstance(stop_when_reached, (int, float)) 578 assert isinstance(callbacks, collections.abc.Sequence) or callbacks is None, ( 579 "callbacks should be a list of callbacks functions" 580 ) 581 assert isinstance(middle_callbacks, collections.abc.Sequence) or middle_callbacks is None, ( 582 "middle_callbacks should be list of MiddleCallbacks functions" 583 ) 584 assert time_limit_secs is None or time_limit_secs > 0, 'time_limit_secs must be None of number > 0' 585 586 self._set_mutation_indexes(mutation_indexes) 587 from OppOpPopInit import set_seed 588 set_seed(seed) 589 590 # randomstate = np.random.default_rng(random.randint(0, 1000) if seed is None else seed) 591 # self.randomstate = randomstate 592 593 # using bool flag is faster than using empty function with generated string arguments 594 SHOW_PROGRESS = progress_bar_stream is not None 595 if SHOW_PROGRESS: 596 597 show_progress = lambda t, t2, s: self._progress(t, t2, status=s) 598 599 if progress_bar_stream == 'stdout': 600 self.progress_stream = sys.stdout 601 elif progress_bar_stream == 'stderr': 602 self.progress_stream = sys.stderr 603 else: 604 raise Exception( 605 f"wrong value {progress_bar_stream} of progress_bar_stream, must be 'stdout'/'stderr'/None" 606 ) 607 else: 608 show_progress = None 609 610 stop_by_val = ( 611 (lambda best_f: False) 612 if stop_when_reached is None 613 else (lambda best_f: best_f <= stop_when_reached) 614 ) 615 616 t: int = 0 617 count_stagnation: int = 0 618 pop: array2D = None 619 scores: array1D = None 620 621 #region CALLBACKS 622 623 def get_data(): 624 """ 625 returns all important data about model 626 """ 627 return MiddleCallbackData( 628 last_generation=Generation(pop, scores), 629 current_generation=t, 630 report_list=self.report, 631 632 mutation_prob=self.prob_mut, 633 mutation_discrete_prob=self.prob_mut_discrete, 634 mutation=self.real_mutation, 635 mutation_discrete=self.discrete_mutation, 636 crossover=self.crossover, 637 selection=self.selection, 638 639 current_stagnation=count_stagnation, 640 max_stagnation=self.max_stagnations, 641 642 parents_portion=self.param.parents_portion, 643 elit_ratio=self.param.elit_ratio, 644 645 set_function=self.set_function, 646 647 reason_to_stop=reason_to_stop 648 ) 649 650 def set_data(data: MiddleCallbackData): 651 """ 652 sets data to model 653 """ 654 nonlocal pop, scores, count_stagnation, reason_to_stop 655 656 pop, scores = data.last_generation.variables, data.last_generation.scores 657 self.population_size = pop.shape[0] 658 659 self.param.parents_portion = data.parents_portion 660 self._set_parents_count(data.parents_portion) 661 662 self.param.elit_ratio = data.elit_ratio 663 self._set_elit_count(self.population_size, data.elit_ratio) 664 665 self.prob_mut = data.mutation_prob 666 self.prob_mut_discrete = data.mutation_discrete_prob 667 self.real_mutation = data.mutation 668 self.discrete_mutation = data.mutation_discrete 669 self.crossover = data.crossover 670 self.selection = data.selection 671 672 count_stagnation = data.current_stagnation 673 reason_to_stop = data.reason_to_stop 674 self.max_stagnations = data.max_stagnation 675 676 self.set_function = data.set_function 677 678 if not callbacks: 679 total_callback = lambda g, r, lp, ls: None 680 else: 681 def total_callback( 682 generation_number: int, 683 report_list: List[float], 684 last_population: array2D, 685 last_scores: array1D 686 ): 687 for cb in callbacks: 688 cb(generation_number, report_list, last_population, last_scores) 689 690 if not middle_callbacks: 691 total_middle_callback = lambda: None 692 else: 693 def total_middle_callback(): 694 """ 695 applies callbacks and sets new data if there is a sense 696 """ 697 data = get_data() 698 flag = False 699 for cb in middle_callbacks: 700 data, has_sense = cb(data) 701 if has_sense: 702 flag = True 703 if flag: 704 set_data(data) # update global date if there was real callback step 705 706 #endregion 707 708 start_time = time.time() 709 time_is_done = ( 710 (lambda: False) 711 if time_limit_secs is None 712 else (lambda: int(time.time() - start_time) >= time_limit_secs) 713 ) 714 715 # combine with deprecated parts 716 function = function or self.f 717 function_timeout = function_timeout or self.funtimeout 718 719 assert function or set_function, 'no function to minimize' 720 if function: 721 self._check_function(function) 722 if function_timeout: 723 self._check_function_timeout(function_timeout) 724 725 self.set_function = set_function or GeneticAlgorithm2.default_set_function(self.f) 726 727 #region Initial population, duplicates filter, revolutionary 728 729 pop_coef, initializer_func = population_initializer 730 731 # population creator by random or with oppositions 732 if init_creator is None: 733 734 from OppOpPopInit import SampleInitializers 735 736 # just uniform random 737 self.creator = SampleInitializers.Combined( 738 minimums=[v[0] for v in self.var_bounds], 739 maximums=[v[1] for v in self.var_bounds], 740 indexes=( 741 self.indexes_int, 742 self.indexes_float 743 ), 744 creator_initializers=( 745 SampleInitializers.RandomInteger, 746 SampleInitializers.Uniform 747 ) 748 ) 749 else: 750 assert callable(init_creator) 751 self.creator = init_creator 752 753 self.init_oppositors = init_oppositors 754 self.dup_oppositor = duplicates_oppositor 755 self.revolution_oppositor = revolution_oppositor 756 757 # event for removing duplicates 758 if remove_duplicates_generation_step is None: 759 def remover(pop: array2D, scores: array1D, gen: int) -> Tuple[ 760 array2D, 761 array1D 762 ]: 763 return pop, scores 764 else: 765 766 def without_dup(pop: array2D, scores: array1D): # returns population without dups 767 _, index_of_dups = np.unique(pop, axis=0, return_index=True) 768 return pop[index_of_dups], scores[index_of_dups], scores.size - index_of_dups.size 769 770 if self.dup_oppositor is None: # if there is no dup_oppositor, use random creator 771 def remover(pop: array2D, scores: array1D, gen: int) -> Tuple[ 772 array2D, 773 array1D 774 ]: 775 if gen % remove_duplicates_generation_step != 0: 776 return pop, scores 777 778 pp, sc, count_to_create = without_dup(pop, scores) # pop without dups 779 780 if count_to_create == 0: 781 if SHOW_PROGRESS: 782 show_progress(t, self.max_iterations, 783 f"GA is running...{t} gen from {self.max_iterations}. No dups!") 784 return pop, scores 785 786 pp2 = SampleInitializers.CreateSamples(self.creator, count_to_create) # new pop elements 787 pp2_scores = self.set_function(pp2) # new elements values 788 789 if SHOW_PROGRESS: 790 show_progress(t, self.max_iterations, 791 f"GA is running...{t} gen from {self.max_iterations}. Kill dups!") 792 793 new_pop = np.vstack((pp, pp2)) 794 new_scores = np.concatenate((sc, pp2_scores)) 795 796 args_to_sort = new_scores.argsort() 797 return new_pop[args_to_sort], new_scores[args_to_sort] 798 799 else: # using oppositors 800 assert callable(self.dup_oppositor) 801 802 def remover(pop: np.ndarray, scores: np.ndarray, gen: int) -> Tuple[ 803 np.ndarray, 804 np.ndarray 805 ]: 806 if gen % remove_duplicates_generation_step != 0: 807 return pop, scores 808 809 pp, sc, count_to_create = without_dup(pop, scores) # pop without dups 810 811 if count_to_create == 0: 812 if SHOW_PROGRESS: 813 show_progress(t, self.max_iterations, 814 f"GA is running...{t} gen from {self.max_iterations}. No dups!") 815 return pop, scores 816 817 if count_to_create > sc.size: 818 raise Exception( 819 f"Too many duplicates at generation {gen} ({count_to_create} > {sc.size}), cannot oppose" 820 ) 821 822 # oppose count_to_create worse elements 823 pp2 = OppositionOperators.Reflect(pp[-count_to_create:], self.dup_oppositor) # new pop elements 824 pp2_scores = self.set_function(pp2) # new elements values 825 826 if SHOW_PROGRESS: 827 show_progress(t, self.max_iterations, 828 f"GA is running...{t} gen from {self.max_iterations}. Kill dups!") 829 830 new_pop = np.vstack((pp, pp2)) 831 new_scores = np.concatenate((sc, pp2_scores)) 832 833 args_to_sort = new_scores.argsort() 834 return new_pop[args_to_sort], new_scores[args_to_sort] 835 836 # event for revolution 837 if revolution_after_stagnation_step is None: 838 def revolution(pop: array2D, scores: array1D, stagnation_count: int) -> Tuple[ 839 array2D, 840 array1D 841 ]: 842 return pop, scores 843 else: 844 if revolution_oppositor is None: 845 raise Exception( 846 f"How can I make revolution each {revolution_after_stagnation_step} stagnation steps " 847 f"if revolution_oppositor is None (not defined)?" 848 ) 849 assert callable(revolution_oppositor) 850 851 from OppOpPopInit import OppositionOperators 852 853 def revolution(pop: array2D, scores: array1D, stagnation_count: int) -> Tuple[ 854 array2D, 855 array1D 856 ]: 857 if stagnation_count < revolution_after_stagnation_step: 858 return pop, scores 859 part = int(pop.shape[0]*revolution_part) 860 861 pp2 = OppositionOperators.Reflect(pop[-part:], self.revolution_oppositor) 862 pp2_scores = self.set_function(pp2) 863 864 combined = np.vstack((pop, pp2)) 865 combined_scores = np.concatenate((scores, pp2_scores)) 866 args = combined_scores.argsort() 867 868 if SHOW_PROGRESS: 869 show_progress(t, self.max_iterations, 870 f"GA is running...{t} gen from {self.max_iterations}. Revolution!") 871 872 args = args[:scores.size] 873 return combined[args], combined_scores[args] 874 875 #enregion 876 877 # 878 # 879 # START ALGORITHM LOGIC 880 # 881 # 882 883 # Report 884 self._clear_report() # clear old report objects 885 self._init_report() 886 887 # initialization of pop 888 889 if start_generation.variables is None: 890 891 from OppOpPopInit import init_population 892 893 real_pop_size = self.population_size * pop_coef 894 895 # pop = np.empty((real_pop_size, self.dim)) 896 scores = np.empty(real_pop_size) 897 898 pop = init_population( 899 samples_count=real_pop_size, 900 creator=self.creator, 901 oppositors=self.init_oppositors 902 ) 903 904 if self.funtimeout and self.funtimeout > 0: # perform simulation 905 906 time_counter = 0 907 908 for p in range(0, real_pop_size): 909 # simulation returns exception or func value -- check the time of evaluating 910 value, eval_time = self._simulate(pop[p]) 911 scores[p] = value 912 time_counter += eval_time 913 914 if enable_printing: 915 print( 916 f"\nSim: Average time of function evaluating (secs): " 917 f"{time_counter/real_pop_size} (total = {time_counter})\n" 918 ) 919 else: 920 921 eval_time = time.time() 922 scores = self.set_function(pop) 923 eval_time = time.time() - eval_time 924 if enable_printing: 925 print( 926 f"\nSet: Average time of function evaluating (secs): " 927 f"{eval_time/real_pop_size} (total = {eval_time})\n" 928 ) 929 930 else: 931 932 self.population_size = start_generation.variables.shape[0] 933 self._set_elit_count(self.population_size, self.param.elit_ratio) 934 self._set_parents_count(self.param.parents_portion) 935 936 pop = start_generation.variables 937 938 if start_generation.scores is None: 939 940 _time = time.time() 941 scores = self.set_function(pop) 942 _time = time.time() - _time 943 944 if enable_printing: 945 print( 946 f'\nFirst scores are made from gotten variables ' 947 f'(by {_time} secs, about {_time/scores.size} for each creature)\n' 948 ) 949 else: 950 scores = start_generation.scores 951 if enable_printing: 952 print(f"\nFirst scores are from gotten population\n") 953 954 # Initialization by select bests and local_descent 955 956 pop, scores = initializer_func(pop, scores) 957 958 # first sort 959 args_to_sort = scores.argsort() 960 pop = pop[args_to_sort] 961 scores = scores[args_to_sort] 962 self._update_report(scores) 963 964 self.population_size = scores.size 965 self.best_function = scores[0] 966 967 if enable_printing: 968 print( 969 f"Best score before optimization: {self.best_function}" 970 ) 971 972 t: int = 1 973 count_stagnation: int = 0 974 """iterations without progress""" 975 reason_to_stop: Optional[str] = None 976 977 # gets indexes of 2 parents to crossover 978 if studEA: 979 get_parents_inds = lambda: (0, random.randrange(1, self.parents_count)) 980 else: 981 get_parents_inds = lambda: random_indexes_pair(self.parents_count) 982 983 # while no reason to stop 984 while True: 985 986 if count_stagnation > self.max_stagnations: 987 reason_to_stop = f"limit of fails: {count_stagnation}" 988 elif t == self.max_iterations: 989 reason_to_stop = f'limit of iterations: {t}' 990 elif stop_by_val(self.best_function): 991 reason_to_stop = f"stop value reached: {self.best_function} <= {stop_when_reached}" 992 elif time_is_done(): 993 reason_to_stop = f'time is done: {time.time() - start_time} >= {time_limit_secs}' 994 995 if reason_to_stop is not None: 996 if SHOW_PROGRESS: 997 show_progress(t, self.max_iterations, f"GA is running... STOP! {reason_to_stop}") 998 break 999 1000 if scores[0] < self.best_function: # if there is progress 1001 count_stagnation = 0 1002 self.best_function = scores[0] 1003 else: 1004 count_stagnation += 1 1005 1006 if SHOW_PROGRESS: 1007 show_progress( 1008 t, 1009 self.max_iterations, 1010 f"GA is running...{t} gen from {self.max_iterations}...best value = {self.best_function}" 1011 ) 1012 1013 # Select parents 1014 1015 par: array2D = np.empty((self.parents_count, self.dim)) 1016 """samples chosen to create new samples""" 1017 par_scores: array1D = np.empty(self.parents_count) 1018 1019 elit_slice = slice(None, self.elit_count) 1020 """elit parents""" 1021 1022 # copy needs because the generation will be removed after parents selection 1023 par[elit_slice] = pop[elit_slice].copy() 1024 par_scores[elit_slice] = scores[elit_slice].copy() 1025 1026 new_par_inds = ( 1027 self.selection( 1028 scores[self.elit_count:], 1029 self.parents_count - self.elit_count 1030 ).astype(np.int16) + self.elit_count 1031 ) 1032 """non-elit parents indexes""" 1033 #new_par_inds = self.selection(scores, self.parents_count - self.elit_count).astype(np.int16) 1034 par_slice = slice(self.elit_count, self.parents_count) 1035 par[par_slice] = pop[new_par_inds].copy() 1036 par_scores[par_slice] = scores[new_par_inds].copy() 1037 1038 pop = np.empty((self.population_size, self.dim)) 1039 """new generation""" 1040 scores = np.empty(self.population_size) 1041 """new generation scores""" 1042 1043 parents_slice = slice(None, self.parents_count) 1044 pop[parents_slice] = par 1045 scores[parents_slice] = par_scores 1046 1047 if self.fill_children is None: # default fill children behaviour 1048 DO_MUTATION = self.needs_mutation 1049 for k in range(self.parents_count, self.population_size, 2): 1050 1051 r1, r2 = get_parents_inds() 1052 1053 pvar1 = pop[r1] # equal to par[r1], but better for cache 1054 pvar2 = pop[r2] 1055 1056 ch1, ch2 = self.crossover(pvar1, pvar2) 1057 1058 if DO_MUTATION: 1059 ch1 = self.mut(ch1) 1060 ch2 = self.mut_middle(ch2, pvar1, pvar2) 1061 1062 pop[k] = ch1 1063 pop[k+1] = ch2 1064 else: # custom behaviour 1065 self.fill_children(pop, self.parents_count) 1066 1067 if apply_function_to_parents: 1068 scores = self.set_function(pop) 1069 else: 1070 scores[self.parents_count:] = self.set_function(pop[self.parents_count:]) 1071 1072 # remove duplicates 1073 pop, scores = remover(pop, scores, t) 1074 # revolution 1075 pop, scores = revolution(pop, scores, count_stagnation) 1076 1077 # make population better 1078 args_to_sort = scores.argsort() 1079 pop = pop[args_to_sort] 1080 scores = scores[args_to_sort] 1081 self._update_report(scores) 1082 1083 # callback it 1084 total_callback(t, self.report, pop, scores) 1085 total_middle_callback() 1086 1087 t += 1 1088 1089 self.best_function = fast_min(scores[0], self.best_function) 1090 1091 last_generation = Generation(pop, scores) 1092 self.result = GAResult(last_generation) 1093 1094 if save_last_generation_as is not None: 1095 last_generation.save(save_last_generation_as) 1096 1097 if enable_printing: 1098 show = ' ' * 200 1099 sys.stdout.write( 1100 f'\r{show}\n' 1101 f'\r The best found solution:\n {pop[0]}' 1102 f'\n\n Objective function:\n {self.best_function}\n' 1103 f'\n Used generations: {len(self.report)}' 1104 f'\n Used time: {time.time() - start_time:.3g} seconds\n' 1105 ) 1106 sys.stdout.flush() 1107 1108 if not no_plot: 1109 self.plot_results() 1110 1111 if enable_printing: 1112 if reason_to_stop is not None and 'iterations' not in reason_to_stop: 1113 sys.stdout.write( 1114 f'\nWarning: GA is terminated because of {reason_to_stop}' 1115 ) 1116 1117 return self.result
runs optimization process
Arguments:
- no_plot: do not plot results using matplotlib by default
- disable_printing: do not print log info of optimization process (except progress bar)
- progress_bar_stream: 'stdout', 'stderr' or None to disable progress bar; disabling progress bar can speed up the optimization process in many cases
- disable_progress_bar: deprecated
- function: the given objective function (sample -> its score) to be minimized;
- function_timeout: if the given function does not provide
output before function_timeout (unit is seconds) the algorithm raises error.
For example, when there is an infinite loop in the given function.
None
means disabling - set_function: set function (all samples -> score per sample) to be used instead of usual function (usually for optimization purposes)
- apply_function_to_parents: whether to apply function to parents from previous generation (if it's needed), it can be needed at working with games agents, but for other tasks will just waste time
- start_generation: initial generation object of any
GenerationConvertible
type - studEA: using stud EA strategy (crossover with best object always)
- mutation_indexes: indexes of dimensions where mutation can be performed (all dimensions by default)
- init_creator: the function creates population samples. By default -- random uniform for real variables and random uniform for int
- init_oppositors: the list of oppositors creates oppositions for base population. No by default
- duplicates_oppositor: oppositor for applying after duplicates removing. By default -- using just random initializer from creator
- remove_duplicates_generation_step: step for removing duplicates (have a sense with discrete tasks). No by default
- revolution_oppositor: oppositor for revolution time. No by default
- revolution_after_stagnation_step: create revolution after this generations of stagnation. No by default
- revolution_part: float, the part of generation to being oppose. By default is 0.3
- population_initializer: object for actions at population initialization step to create better start population. See doc
- stop_when_reached: stop searching after reaching this value (it can be potential minimum or something else)
- callbacks: sequence of callback functions with structure: (generation_number, report_list, last_population, last_scores) -> do some action
- middle_callbacks: sequence of functions made by
MiddleCallback
class - time_limit_secs: limit time of working (in seconds);
None
means no time limit (limit will be only for count of generation and so on) - save_last_generation_as: path to .npz file for saving last_generation as numpy dictionary like {'population': 2D-array, 'scores': 1D-array}; None disables this option
- seed: random seed (None if doesn't matter)
Returns:
GAResult
object; also fills the self.report and self.result with many report information
Notes:
- if
function_timeout
is enabled thenfunction
must be set- it would be more logical to use params like
studEA
as an algorithm parameter, butrun()
-way can be more convenient for real using
1123 def plot_results( 1124 self, 1125 title: str = 'Genetic Algorithm', 1126 save_as: Optional[str] = None, 1127 dpi: int = 200, 1128 main_color: str = 'blue' 1129 ): 1130 """ 1131 Simple plot of self.report (if not empty) 1132 """ 1133 if len(self.report) == 0: 1134 sys.stdout.write("No results to plot!\n") 1135 return 1136 1137 plot_several_lines( 1138 lines=[self.report], 1139 colors=[main_color], 1140 labels=['best of generation'], 1141 linewidths=[2], 1142 title=title, 1143 xlabel='Generation', 1144 ylabel='Minimized function', 1145 save_as=save_as, 1146 dpi=dpi 1147 )
Simple plot of self.report (if not empty)
1149 def plot_generation_scores(self, title: str = 'Last generation scores', save_as: Optional[str] = None): 1150 """ 1151 Plots barplot of scores of last population 1152 """ 1153 1154 if not hasattr(self, 'result'): 1155 raise Exception( 1156 "There is no 'result' field into ga object! Before plotting generation u need to run seaching process" 1157 ) 1158 1159 plot_pop_scores(self.result.last_generation.scores, title, save_as)
Plots barplot of scores of last population
1164 def mut(self, x: array1D): 1165 """ 1166 just mutation 1167 """ 1168 1169 for i in self.indexes_int_mut: 1170 if random.random() < self.prob_mut_discrete: 1171 x[i] = self.discrete_mutation(x[i], *self.var_bounds[i]) 1172 1173 for i in self.indexes_float_mut: 1174 if random.random() < self.prob_mut: 1175 x[i] = self.real_mutation(x[i], *self.var_bounds[i]) 1176 1177 return x
just mutation
1179 def mut_middle(self, x: array1D, p1: array1D, p2: array1D): 1180 """ 1181 mutation oriented on parents 1182 """ 1183 for i in self.indexes_int_mut: 1184 1185 if random.random() < self.prob_mut_discrete: 1186 v1, v2 = p1[i], p2[i] 1187 if v1 < v2: 1188 x[i] = random.randint(v1, v2) 1189 elif v1 > v2: 1190 x[i] = random.randint(v2, v1) 1191 else: 1192 x[i] = random.randint(*self.var_bounds[i]) 1193 1194 for i in self.indexes_float_mut: 1195 if random.random() < self.prob_mut: 1196 v1, v2 = p1[i], p2[i] 1197 if v1 != v2: 1198 x[i] = random.uniform(v1, v2) 1199 else: 1200 x[i] = random.uniform(*self.var_bounds[i]) 1201 return x
mutation oriented on parents
1207 @staticmethod 1208 def default_set_function(function_for_set: Callable[[array1D], float]): 1209 """ 1210 simple function for creating set_function 1211 function_for_set just applies to each row of population 1212 """ 1213 def func(matrix: array2D): 1214 return np.array( 1215 [function_for_set(row) for row in matrix] 1216 ) 1217 return func
simple function for creating set_function function_for_set just applies to each row of population
1219 @staticmethod 1220 def vectorized_set_function(function_for_set: Callable[[array1D], float]): 1221 """ 1222 works like default, but faster for big populations and slower for little 1223 function_for_set just applyes to each row of population 1224 """ 1225 1226 func = np.vectorize(function_for_set, signature='(n)->()') 1227 return func
works like default, but faster for big populations and slower for little function_for_set just applyes to each row of population
1229 @staticmethod 1230 def set_function_multiprocess(function_for_set: Callable[[array1D], float], n_jobs: int = -1): 1231 """ 1232 like function_for_set but uses joblib with n_jobs (-1 goes to count of available processors) 1233 """ 1234 try: 1235 from joblib import Parallel, delayed 1236 except ModuleNotFoundError: 1237 raise ModuleNotFoundError( 1238 "this additional feature requires joblib package," 1239 "run `pip install joblib` or `pip install --upgrade geneticalgorithm2[full]`" 1240 "or use another set function" 1241 ) 1242 1243 def func(matrix: array2D): 1244 result = Parallel(n_jobs=n_jobs)(delayed(function_for_set)(matrix[i]) for i in range(matrix.shape[0])) 1245 return np.array(result) 1246 return func
like function_for_set but uses joblib with n_jobs (-1 goes to count of available processors)