geneticalgorithm2.geneticalgorithm2
1from typing import Callable, List, Tuple, Optional, Dict, Any, Union, Sequence, Literal, Iterable 2from typing_extensions import TypeAlias 3 4import collections 5import warnings 6import operator 7 8import sys 9import time 10import random 11import math 12 13import numpy as np 14 15from OppOpPopInit.initialiser import CreatorFunc 16from OppOpPopInit.oppositor import OppositorFunc 17 18#region INTERNAL IMPORTS 19 20from .utils.aliases import array1D, array2D 21 22from .data_types.aliases import FunctionToMinimize, SetFunctionToMinimize 23from .data_types.algorithm_params import AlgorithmParams 24from .data_types.generation import GenerationConvertible, Generation 25from .data_types.result import GAResult 26 27from .population_initializer import get_population_initializer, PopulationModifier 28from .utils.plotting import plot_pop_scores, plot_several_lines 29 30from .utils.funcs import can_be_prob, is_numpy, is_current_gen_number, fast_min, random_indexes_pair 31 32from .callbacks.data import MiddleCallbackData 33from .callbacks import MiddleCallbackFunc, SimpleCallbackFunc 34 35#endregion 36 37#region ALIASES 38 39VARIABLE_TYPE: TypeAlias = Literal['int', 'real', 'bool'] 40""" 41the variable type for a given or all dimension, determines the values discretion: 42 real: double numbers 43 int: integer number only 44 bool: in the fact is integer with bounds [0, 1] 45""" 46 47#endregion 48 49 50class GeneticAlgorithm2: 51 """ 52 Genetic algorithm optimization process 53 """ 54 55 default_params = AlgorithmParams() 56 PROGRESS_BAR_LEN = 20 57 """max count of symbols in the progress bar""" 58 59 @property 60 def output_dict(self): 61 warnings.warn( 62 "'output_dict' is deprecated and will be removed at version 7 \n" 63 "use 'result' instead" 64 ) 65 return self.result 66 67 @property 68 def needs_mutation(self) -> bool: 69 """whether the mutation is required""" 70 return self.prob_mut > 0 or self.prob_mut_discrete > 0 71 72 #region INIT 73 74 def __init__( 75 self, 76 function: FunctionToMinimize = None, 77 78 dimension: int = 0, 79 variable_type: Union[VARIABLE_TYPE, Sequence[VARIABLE_TYPE]] = 'bool', 80 variable_boundaries: Optional[Union[array2D, Sequence[Tuple[float, float]]]] = None, 81 82 variable_type_mixed=None, 83 84 function_timeout: Optional[float] = None, 85 algorithm_parameters: Union[AlgorithmParams, Dict[str, Any]] = default_params 86 ): 87 """ 88 initializes the GA object and performs main checks 89 90 Args: 91 function: the given objective function to be minimized -- deprecated and moved to run() method 92 dimension: the number of decision variables, the population samples dimension 93 94 variable_type: string means the variable type for all variables, 95 for mixed types use sequence of strings of type for each variable 96 97 variable_boundaries: leave it None if variable_type is 'bool'; 98 otherwise provide a sequence of tuples of length two as boundaries for each variable; 99 the length of the array must be equal dimension. 100 For example, ([0,100], [0,200]) determines 101 lower boundary 0 and upper boundary 100 for first 102 and upper boundary 200 for second variable 103 and dimension must be 2. 104 105 variable_type_mixed -- deprecated 106 107 function_timeout: if the given function does not provide 108 output before function_timeout (unit is seconds) the algorithm raises error. 109 For example, when there is an infinite loop in the given function. 110 `None` means disabling -- deprecated and moved to run() 111 112 algorithm_parameters: AlgorithmParams object or usual dictionary with algorithm parameter; 113 it is not mandatory to provide all possible parameters 114 115 Notes: 116 - This implementation minimizes the given objective function. 117 For maximization u can multiply the function by -1 (for instance): the absolute 118 value of the output would be the actual objective function 119 120 for more details and examples of implementation please visit: 121 https://github.com/PasaOpasen/geneticalgorithm2 122 123 """ 124 125 # all default fields 126 127 # self.crossover: Callable[[np.ndarray, np.ndarray], Tuple[np.ndarray, np.ndarray]] = None 128 # self.real_mutation: Callable[[float, float, float], float] = None 129 # self.discrete_mutation: Callable[[int, int, int], int] = None 130 # self.selection: Callable[[np.ndarray, int], np.ndarray] = None 131 132 self.result = None 133 self.best_function = None 134 135 self.revolution_oppositor = None 136 self.dup_oppositor = None 137 self.creator = None 138 self.init_oppositors = None 139 140 self.f: Callable[[array1D], float] = None 141 self.funtimeout: float = None 142 143 self.set_function: Callable[[np.ndarray], np.ndarray] = None 144 145 # self.dim: int = None 146 self.var_bounds: List[Tuple[Union[int, float], Union[int, float]]] = None 147 # self.indexes_int: np.ndarray = None 148 # self.indexes_float: np.ndarray = None 149 150 self.checked_reports: List[Tuple[str, Callable[[array1D], None]]] = None 151 152 self.population_size: int = None 153 self.progress_stream = None 154 155 # input algorithm's parameters 156 157 assert isinstance(algorithm_parameters, (dict, AlgorithmParams)), ( 158 "algorithm_parameters must be dict or AlgorithmParams object" 159 ) 160 if not isinstance(algorithm_parameters, AlgorithmParams): 161 algorithm_parameters = AlgorithmParams.from_dict(algorithm_parameters) 162 algorithm_parameters.validate() 163 self.param = algorithm_parameters 164 165 self.crossover, self.real_mutation, self.discrete_mutation, self.selection = algorithm_parameters.get_CMS_funcs() 166 167 # dimension and area bounds 168 self.dim = int(dimension) 169 assert self.dim > 0, f"dimension count must be int and >0, gotten {dimension}" 170 171 if variable_type_mixed is not None: 172 warnings.warn( 173 f"argument variable_type_mixed is deprecated and will be removed at version 7\n " 174 f"use variable_type={tuple(variable_type_mixed)} instead" 175 ) 176 variable_type = variable_type_mixed 177 self._set_types_indexes(variable_type) # types indexes 178 self._set_var_boundaries(variable_type, variable_boundaries) # input variables' boundaries 179 180 # fix mutation probs 181 182 assert can_be_prob(self.param.mutation_probability) 183 self.prob_mut = self.param.mutation_probability 184 assert self.param.mutation_discrete_probability is None or can_be_prob(self.param.mutation_discrete_probability) 185 self.prob_mut_discrete = self.param.mutation_discrete_probability or self.prob_mut 186 187 if self.param.crossover_probability is not None: 188 warnings.warn( 189 f"crossover_probability is deprecated and will be removed in version 7. " 190 f"Reason: it's old and has no sense" 191 ) 192 193 ############################################################# 194 # input function 195 if function: 196 warnings.warn( 197 f"function is deprecated in init constructor and will be removed in version 7. " 198 f"Move this argument to run() method" 199 ) 200 self._check_function(function) 201 if function_timeout: 202 warnings.warn( 203 f"function_timeout is deprecated in init constructor and will be removed in version 7. " 204 f"Move this argument to run() method" 205 ) 206 self._check_function_timeout(function_timeout) 207 208 ############################################################# 209 210 self.population_size = int(self.param.population_size) 211 self._set_parents_count(self.param.parents_portion) 212 self._set_elit_count(self.population_size, self.param.elit_ratio) 213 assert self.parents_count >= self.elit_count, ( 214 f"\n number of parents ({self.parents_count}) " 215 f"must be greater than number of elits ({self.elit_count})" 216 ) 217 218 self._set_max_iterations() 219 220 self._set_report() 221 222 # specify this function to speed up or change default behaviour 223 self.fill_children: Optional[Callable[[array2D, int], None]] = None 224 """ 225 custom function which adds children for population POP 226 where POP[:parents_count] are parents lines and next lines are for children 227 """ 228 229 def _set_types_indexes(self, variable_type: Union[str, Sequence[str]]): 230 231 indexes = np.arange(self.dim) 232 self.indexes_int = np.array([]) 233 self.indexes_float = np.array([]) 234 235 assert_message = ( 236 f"\n variable_type must be 'bool', 'int', 'real' or a sequence with 'int' and 'real', got {variable_type}" 237 ) 238 239 if isinstance(variable_type, str): 240 assert (variable_type in VARIABLE_TYPE.__args__), assert_message 241 if variable_type == 'real': 242 self.indexes_float = indexes 243 else: 244 self.indexes_int = indexes 245 246 else: # sequence case 247 248 assert len(variable_type) == self.dim, ( 249 f"\n variable_type must have a length equal dimension. " 250 f"Should be {self.dim}, got {len(variable_type)}" 251 ) 252 assert 'bool' not in variable_type, ( 253 "don't use 'bool' if variable_type is a sequence, " 254 "for 'boolean' case use 'int' and specify boundary as (0,1)" 255 ) 256 assert all(v in VARIABLE_TYPE.__args__ for v in variable_type), assert_message 257 258 vartypes = np.array(variable_type) 259 self.indexes_int = indexes[vartypes == 'int'] 260 self.indexes_float = indexes[vartypes == 'real'] 261 262 def _set_var_boundaries( 263 self, 264 variable_type: Union[str, Sequence[str]], 265 variable_boundaries 266 ): 267 if isinstance(variable_type, str) and variable_type == 'bool': 268 self.var_bounds = [(0, 1)] * self.dim 269 else: 270 271 if is_numpy(variable_boundaries): 272 assert variable_boundaries.shape == (self.dim, 2), ( 273 f"\n if variable_boundaries is numpy array, it must be with shape (dim, 2)" 274 ) 275 else: 276 assert len(variable_boundaries) == self.dim and all((len(t) == 2 for t in variable_boundaries)), ( 277 "\n if variable_boundaries is sequence, " 278 "it must be with len dim and boundary for each variable must be a tuple of length two" 279 ) 280 281 for i in variable_boundaries: 282 assert i[0] <= i[1], "\n lower_boundaries must be smaller than upper_boundaries [lower,upper]" 283 284 self.var_bounds = [(i[0], i[1]) for i in variable_boundaries] 285 286 def _set_parents_count(self, parents_portion: float): 287 288 self.parents_count = int(parents_portion * self.population_size) 289 assert self.population_size >= self.parents_count > 1, ( 290 f'parents count {self.parents_count} cannot be less than population size {self.population_size}' 291 ) 292 trl = self.population_size - self.parents_count 293 if trl % 2 != 0: 294 self.parents_count += 1 295 296 def _set_elit_count(self, pop_size: int, elit_ratio: Union[float, int]): 297 298 assert elit_ratio >= 0 299 self.elit_count = elit_ratio if isinstance(elit_ratio, str) else math.ceil(pop_size*elit_ratio) 300 301 def _set_max_iterations(self): 302 303 if self.param.max_num_iteration is None: 304 iterate = 0 305 for i in range(0, self.dim): 306 bound_min, bound_max = self.var_bounds[i] 307 var_space = bound_max - bound_min 308 if i in self.indexes_int: 309 iterate += var_space * self.dim * (100 / self.population_size) 310 else: 311 iterate += var_space * 50 * (100 / self.population_size) 312 iterate = int(iterate) 313 if (iterate * self.population_size) > 10000000: 314 iterate = 10000000 / self.population_size 315 316 self.max_iterations = fast_min(iterate, 8000) 317 else: 318 assert self.param.max_num_iteration > 0 319 self.max_iterations = math.ceil(self.param.max_num_iteration) 320 321 max_it = self.param.max_iteration_without_improv 322 if max_it is None: 323 self.max_stagnations = self.max_iterations + 1 324 else: 325 self.max_stagnations = math.ceil(max_it) 326 327 def _check_function(self, function: Callable[[array1D], float]): 328 assert callable(function), "function must be callable!" 329 self.f = function 330 331 def _check_function_timeout(self, function_timeout: Optional[float]): 332 333 if function_timeout is not None and function_timeout > 0: 334 try: 335 from func_timeout import func_timeout, FunctionTimedOut 336 except ModuleNotFoundError: 337 raise ModuleNotFoundError( 338 "function_timeout > 0 needs additional package func_timeout\n" 339 "run `python -m pip install func_timeout`\n" 340 "or disable this parameter: function_timeout=None" 341 ) 342 343 self.funtimeout = None if function_timeout is None else float(function_timeout) 344 345 #endregion 346 347 #region REPORT 348 349 def _set_report(self): 350 """ 351 creates default report checker 352 """ 353 self.checked_reports = [ 354 # item 0 cuz scores will be sorted and min item is items[0] 355 ('report', operator.itemgetter(0)) 356 ] 357 358 def _clear_report(self): 359 """ 360 removes all report objects 361 """ 362 fields = [f for f in vars(self).keys() if f.startswith('report')] 363 for attr in fields: 364 delattr(self, attr) 365 366 def _init_report(self): 367 """ 368 makes empty report fields 369 """ 370 for name, _ in self.checked_reports: 371 setattr(self, name, []) 372 373 def _update_report(self, scores: array1D): 374 """ 375 append report value to the end of field 376 """ 377 for name, func in self.checked_reports: 378 getattr(self, name).append( 379 func(scores) 380 ) 381 382 #endregion 383 384 #region RUN METHODS 385 386 def _progress(self, count: int, total: int, status: str = ''): 387 388 part = count / total 389 390 filled_len = round(GeneticAlgorithm2.PROGRESS_BAR_LEN * part) 391 percents = round(100.0 * part, 1) 392 bar = '|' * filled_len + '_' * (GeneticAlgorithm2.PROGRESS_BAR_LEN - filled_len) 393 394 self.progress_stream.write('\r%s %s%s %s' % (bar, percents, '%', status)) 395 self.progress_stream.flush() 396 397 def __str__(self): 398 return f"Genetic algorithm object with parameters {self.param}" 399 400 def __repr__(self): 401 return self.__str__() 402 403 def _simulate(self, sample: array1D): 404 405 from func_timeout import func_timeout, FunctionTimedOut 406 407 obj = None 408 eval_time = time.time() 409 try: 410 obj = func_timeout( 411 self.funtimeout, 412 lambda: self.f(sample) 413 ) 414 except FunctionTimedOut: 415 print("given function is not applicable") 416 eval_time = time.time() - eval_time 417 418 assert obj is not None, ( 419 f"the given function was running like {eval_time} seconds and does not provide any output" 420 ) 421 422 tp = type(obj) 423 assert ( 424 tp in (int, float) or np.issubdtype(tp, np.floating) or np.issubdtype(tp, np.integer) 425 ), f"Minimized function should return a number, but got '{obj}' object with type {tp}" 426 427 return obj, eval_time 428 429 def _set_mutation_indexes(self, mutation_indexes: Optional[Iterable[int]]): 430 431 if mutation_indexes is None: 432 self.indexes_float_mut = self.indexes_float 433 self.indexes_int_mut = self.indexes_int 434 else: 435 tmp_indexes = set(mutation_indexes) 436 self.indexes_int_mut = np.array(list(tmp_indexes.intersection(self.indexes_int))) 437 self.indexes_float_mut = np.array(list(tmp_indexes.intersection(self.indexes_float))) 438 439 if self.indexes_float_mut.size == 0 and self.indexes_int_mut.size == 0: 440 warnings.warn(f"No mutation dimensions!!! Check ur mutation indexes!!") 441 442 #@profile 443 def run( 444 self, 445 no_plot: bool = False, 446 disable_printing: bool = False, 447 progress_bar_stream: Optional[str] = 'stdout', 448 449 # deprecated 450 disable_progress_bar: bool = False, 451 452 function: FunctionToMinimize = None, 453 function_timeout: Optional[float] = None, 454 455 set_function: SetFunctionToMinimize = None, 456 apply_function_to_parents: bool = False, 457 start_generation: GenerationConvertible = Generation(), 458 studEA: bool = False, 459 mutation_indexes: Optional[Iterable[int]] = None, 460 461 init_creator: Optional[CreatorFunc] = None, 462 init_oppositors: Optional[Sequence[OppositorFunc]] = None, 463 464 duplicates_oppositor: Optional[OppositorFunc] = None, 465 remove_duplicates_generation_step: Optional[int] = None, 466 467 revolution_oppositor: Optional[OppositorFunc] = None, 468 revolution_after_stagnation_step: Optional[int] = None, 469 revolution_part: float = 0.3, 470 471 population_initializer: Tuple[ 472 int, PopulationModifier 473 ] = get_population_initializer(select_best_of=1, local_optimization_step='never', local_optimizer=None), 474 475 stop_when_reached: Optional[float] = None, 476 callbacks: Optional[Sequence[SimpleCallbackFunc]] = None, 477 middle_callbacks: Optional[Sequence[MiddleCallbackFunc]] = None, #+ 478 time_limit_secs: Optional[float] = None, 479 save_last_generation_as: Optional[str] = None, 480 seed: Optional[int] = None 481 ) -> GAResult: 482 """ 483 runs optimization process 484 485 Args: 486 no_plot: do not plot results using matplotlib by default 487 488 disable_printing: do not print log info of optimization process (except progress bar) 489 490 progress_bar_stream: 'stdout', 'stderr' or None to disable progress bar; 491 disabling progress bar can speed up the optimization process in many cases 492 493 disable_progress_bar: deprecated 494 495 function: the given objective function (sample -> its score) to be minimized; 496 497 function_timeout: if the given function does not provide 498 output before function_timeout (unit is seconds) the algorithm raises error. 499 For example, when there is an infinite loop in the given function. 500 `None` means disabling 501 502 set_function: set function (all samples -> score per sample) to be used instead of usual function 503 (usually for optimization purposes) 504 505 apply_function_to_parents: whether to apply function to parents from previous generation (if it's needed), 506 it can be needed at working with games agents, but for other tasks will just waste time 507 508 start_generation: initial generation object of any `GenerationConvertible` type 509 510 studEA: using stud EA strategy (crossover with best object always) 511 512 mutation_indexes: indexes of dimensions where mutation can be performed (all dimensions by default) 513 514 init_creator: the function creates population samples. 515 By default -- random uniform for real variables and random uniform for int 516 init_oppositors: the list of oppositors creates oppositions for base population. No by default 517 518 duplicates_oppositor: oppositor for applying after duplicates removing. 519 By default -- using just random initializer from creator 520 remove_duplicates_generation_step: step for removing duplicates (have a sense with discrete tasks). 521 No by default 522 523 revolution_oppositor: oppositor for revolution time. No by default 524 revolution_after_stagnation_step: create revolution after this generations of stagnation. No by default 525 revolution_part: float, the part of generation to being oppose. By default is 0.3 526 527 population_initializer: object for actions at population initialization step 528 to create better start population. See doc 529 530 stop_when_reached: stop searching after reaching this value 531 (it can be potential minimum or something else) 532 533 callbacks: sequence of callback functions with structure: 534 (generation_number, report_list, last_population, last_scores) -> do some action 535 536 middle_callbacks: sequence of functions made by `MiddleCallback` class 537 538 time_limit_secs: limit time of working (in seconds); 539 `None` means no time limit (limit will be only for count of generation and so on) 540 541 save_last_generation_as: path to .npz file 542 for saving last_generation as numpy dictionary like 543 {'population': 2D-array, 'scores': 1D-array}; 544 None disables this option 545 546 seed: random seed (None if doesn't matter) 547 548 Returns: 549 `GAResult` object; 550 also fills the self.report and self.result with many report information 551 552 Notes: 553 - if `function_timeout` is enabled then `function` must be set 554 - it would be more logical to use params like `studEA` as an algorithm parameter, 555 but `run()`-way can be more convenient for real using 556 """ 557 558 if disable_progress_bar: 559 warnings.warn( 560 f"disable_progress_bar is deprecated and will be removed in version 7, " 561 f"use probress_bar_stream=None to disable progress bar" 562 ) 563 progress_bar_stream = None 564 565 enable_printing: bool = not disable_printing 566 567 start_generation = Generation.from_object(self.dim, start_generation) 568 569 assert is_current_gen_number(revolution_after_stagnation_step), "must be None or int and >0" 570 assert is_current_gen_number(remove_duplicates_generation_step), "must be None or int and >0" 571 assert can_be_prob(revolution_part), f"revolution_part must be in [0,1], not {revolution_part}" 572 assert stop_when_reached is None or isinstance(stop_when_reached, (int, float)) 573 assert isinstance(callbacks, collections.abc.Sequence) or callbacks is None, ( 574 "callbacks should be a list of callbacks functions" 575 ) 576 assert isinstance(middle_callbacks, collections.abc.Sequence) or middle_callbacks is None, ( 577 "middle_callbacks should be list of MiddleCallbacks functions" 578 ) 579 assert time_limit_secs is None or time_limit_secs > 0, 'time_limit_secs must be None of number > 0' 580 581 self._set_mutation_indexes(mutation_indexes) 582 from OppOpPopInit import set_seed 583 set_seed(seed) 584 585 # randomstate = np.random.default_rng(random.randint(0, 1000) if seed is None else seed) 586 # self.randomstate = randomstate 587 588 # using bool flag is faster than using empty function with generated string arguments 589 SHOW_PROGRESS = progress_bar_stream is not None 590 if SHOW_PROGRESS: 591 592 show_progress = lambda t, t2, s: self._progress(t, t2, status=s) 593 594 if progress_bar_stream == 'stdout': 595 self.progress_stream = sys.stdout 596 elif progress_bar_stream == 'stderr': 597 self.progress_stream = sys.stderr 598 else: 599 raise Exception( 600 f"wrong value {progress_bar_stream} of progress_bar_stream, must be 'stdout'/'stderr'/None" 601 ) 602 else: 603 show_progress = None 604 605 stop_by_val = ( 606 (lambda best_f: False) 607 if stop_when_reached is None 608 else (lambda best_f: best_f <= stop_when_reached) 609 ) 610 611 t: int = 0 612 count_stagnation: int = 0 613 pop: array2D = None 614 scores: array1D = None 615 616 #region CALLBACKS 617 618 def get_data(): 619 """ 620 returns all important data about model 621 """ 622 return MiddleCallbackData( 623 last_generation=Generation(pop, scores), 624 current_generation=t, 625 report_list=self.report, 626 627 mutation_prob=self.prob_mut, 628 mutation_discrete_prob=self.prob_mut_discrete, 629 mutation=self.real_mutation, 630 mutation_discrete=self.discrete_mutation, 631 crossover=self.crossover, 632 selection=self.selection, 633 634 current_stagnation=count_stagnation, 635 max_stagnation=self.max_stagnations, 636 637 parents_portion=self.param.parents_portion, 638 elit_ratio=self.param.elit_ratio, 639 640 set_function=self.set_function, 641 642 reason_to_stop=reason_to_stop 643 ) 644 645 def set_data(data: MiddleCallbackData): 646 """ 647 sets data to model 648 """ 649 nonlocal pop, scores, count_stagnation, reason_to_stop 650 651 pop, scores = data.last_generation.variables, data.last_generation.scores 652 self.population_size = pop.shape[0] 653 654 self.param.parents_portion = data.parents_portion 655 self._set_parents_count(data.parents_portion) 656 657 self.param.elit_ratio = data.elit_ratio 658 self._set_elit_count(self.population_size, data.elit_ratio) 659 660 self.prob_mut = data.mutation_prob 661 self.prob_mut_discrete = data.mutation_discrete_prob 662 self.real_mutation = data.mutation 663 self.discrete_mutation = data.mutation_discrete 664 self.crossover = data.crossover 665 self.selection = data.selection 666 667 count_stagnation = data.current_stagnation 668 reason_to_stop = data.reason_to_stop 669 self.max_stagnations = data.max_stagnation 670 671 self.set_function = data.set_function 672 673 if not callbacks: 674 total_callback = lambda g, r, lp, ls: None 675 else: 676 def total_callback( 677 generation_number: int, 678 report_list: List[float], 679 last_population: array2D, 680 last_scores: array1D 681 ): 682 for cb in callbacks: 683 cb(generation_number, report_list, last_population, last_scores) 684 685 if not middle_callbacks: 686 total_middle_callback = lambda: None 687 else: 688 def total_middle_callback(): 689 """ 690 applies callbacks and sets new data if there is a sense 691 """ 692 data = get_data() 693 flag = False 694 for cb in middle_callbacks: 695 data, has_sense = cb(data) 696 if has_sense: 697 flag = True 698 if flag: 699 set_data(data) # update global date if there was real callback step 700 701 #endregion 702 703 start_time = time.time() 704 time_is_done = ( 705 (lambda: False) 706 if time_limit_secs is None 707 else (lambda: int(time.time() - start_time) >= time_limit_secs) 708 ) 709 710 # combine with deprecated parts 711 function = function or self.f 712 function_timeout = function_timeout or self.funtimeout 713 714 assert function or set_function, 'no function to minimize' 715 if function: 716 self._check_function(function) 717 if function_timeout: 718 self._check_function_timeout(function_timeout) 719 720 self.set_function = set_function or GeneticAlgorithm2.default_set_function(self.f) 721 722 #region Initial population, duplicates filter, revolutionary 723 724 pop_coef, initializer_func = population_initializer 725 726 # population creator by random or with oppositions 727 if init_creator is None: 728 729 from OppOpPopInit import SampleInitializers 730 731 # just uniform random 732 self.creator = SampleInitializers.Combined( 733 minimums=[v[0] for v in self.var_bounds], 734 maximums=[v[1] for v in self.var_bounds], 735 indexes=( 736 self.indexes_int, 737 self.indexes_float 738 ), 739 creator_initializers=( 740 SampleInitializers.RandomInteger, 741 SampleInitializers.Uniform 742 ) 743 ) 744 else: 745 assert callable(init_creator) 746 self.creator = init_creator 747 748 self.init_oppositors = init_oppositors 749 self.dup_oppositor = duplicates_oppositor 750 self.revolution_oppositor = revolution_oppositor 751 752 # event for removing duplicates 753 if remove_duplicates_generation_step is None: 754 def remover(pop: array2D, scores: array1D, gen: int) -> Tuple[ 755 array2D, 756 array1D 757 ]: 758 return pop, scores 759 else: 760 761 def without_dup(pop: array2D, scores: array1D): # returns population without dups 762 _, index_of_dups = np.unique(pop, axis=0, return_index=True) 763 return pop[index_of_dups], scores[index_of_dups], scores.size - index_of_dups.size 764 765 if self.dup_oppositor is None: # if there is no dup_oppositor, use random creator 766 def remover(pop: array2D, scores: array1D, gen: int) -> Tuple[ 767 array2D, 768 array1D 769 ]: 770 if gen % remove_duplicates_generation_step != 0: 771 return pop, scores 772 773 pp, sc, count_to_create = without_dup(pop, scores) # pop without dups 774 775 if count_to_create == 0: 776 if SHOW_PROGRESS: 777 show_progress(t, self.max_iterations, 778 f"GA is running...{t} gen from {self.max_iterations}. No dups!") 779 return pop, scores 780 781 pp2 = SampleInitializers.CreateSamples(self.creator, count_to_create) # new pop elements 782 pp2_scores = self.set_function(pp2) # new elements values 783 784 if SHOW_PROGRESS: 785 show_progress(t, self.max_iterations, 786 f"GA is running...{t} gen from {self.max_iterations}. Kill dups!") 787 788 new_pop = np.vstack((pp, pp2)) 789 new_scores = np.concatenate((sc, pp2_scores)) 790 791 args_to_sort = new_scores.argsort() 792 return new_pop[args_to_sort], new_scores[args_to_sort] 793 794 else: # using oppositors 795 assert callable(self.dup_oppositor) 796 797 def remover(pop: np.ndarray, scores: np.ndarray, gen: int) -> Tuple[ 798 np.ndarray, 799 np.ndarray 800 ]: 801 if gen % remove_duplicates_generation_step != 0: 802 return pop, scores 803 804 pp, sc, count_to_create = without_dup(pop, scores) # pop without dups 805 806 if count_to_create == 0: 807 if SHOW_PROGRESS: 808 show_progress(t, self.max_iterations, 809 f"GA is running...{t} gen from {self.max_iterations}. No dups!") 810 return pop, scores 811 812 if count_to_create > sc.size: 813 raise Exception( 814 f"Too many duplicates at generation {gen} ({count_to_create} > {sc.size}), cannot oppose" 815 ) 816 817 # oppose count_to_create worse elements 818 pp2 = OppositionOperators.Reflect(pp[-count_to_create:], self.dup_oppositor) # new pop elements 819 pp2_scores = self.set_function(pp2) # new elements values 820 821 if SHOW_PROGRESS: 822 show_progress(t, self.max_iterations, 823 f"GA is running...{t} gen from {self.max_iterations}. Kill dups!") 824 825 new_pop = np.vstack((pp, pp2)) 826 new_scores = np.concatenate((sc, pp2_scores)) 827 828 args_to_sort = new_scores.argsort() 829 return new_pop[args_to_sort], new_scores[args_to_sort] 830 831 # event for revolution 832 if revolution_after_stagnation_step is None: 833 def revolution(pop: array2D, scores: array1D, stagnation_count: int) -> Tuple[ 834 array2D, 835 array1D 836 ]: 837 return pop, scores 838 else: 839 if revolution_oppositor is None: 840 raise Exception( 841 f"How can I make revolution each {revolution_after_stagnation_step} stagnation steps " 842 f"if revolution_oppositor is None (not defined)?" 843 ) 844 assert callable(revolution_oppositor) 845 846 from OppOpPopInit import OppositionOperators 847 848 def revolution(pop: array2D, scores: array1D, stagnation_count: int) -> Tuple[ 849 array2D, 850 array1D 851 ]: 852 if stagnation_count < revolution_after_stagnation_step: 853 return pop, scores 854 part = int(pop.shape[0]*revolution_part) 855 856 pp2 = OppositionOperators.Reflect(pop[-part:], self.revolution_oppositor) 857 pp2_scores = self.set_function(pp2) 858 859 combined = np.vstack((pop, pp2)) 860 combined_scores = np.concatenate((scores, pp2_scores)) 861 args = combined_scores.argsort() 862 863 if SHOW_PROGRESS: 864 show_progress(t, self.max_iterations, 865 f"GA is running...{t} gen from {self.max_iterations}. Revolution!") 866 867 args = args[:scores.size] 868 return combined[args], combined_scores[args] 869 870 #enregion 871 872 # 873 # 874 # START ALGORITHM LOGIC 875 # 876 # 877 878 # Report 879 self._clear_report() # clear old report objects 880 self._init_report() 881 882 # initialization of pop 883 884 if start_generation.variables is None: 885 886 from OppOpPopInit import init_population 887 888 real_pop_size = self.population_size * pop_coef 889 890 # pop = np.empty((real_pop_size, self.dim)) 891 scores = np.empty(real_pop_size) 892 893 pop = init_population( 894 samples_count=real_pop_size, 895 creator=self.creator, 896 oppositors=self.init_oppositors 897 ) 898 899 if self.funtimeout and self.funtimeout > 0: # perform simulation 900 901 time_counter = 0 902 903 for p in range(0, real_pop_size): 904 # simulation returns exception or func value -- check the time of evaluating 905 value, eval_time = self._simulate(pop[p]) 906 scores[p] = value 907 time_counter += eval_time 908 909 if enable_printing: 910 print( 911 f"\nSim: Average time of function evaluating (secs): " 912 f"{time_counter/real_pop_size} (total = {time_counter})\n" 913 ) 914 else: 915 916 eval_time = time.time() 917 scores = self.set_function(pop) 918 eval_time = time.time() - eval_time 919 if enable_printing: 920 print( 921 f"\nSet: Average time of function evaluating (secs): " 922 f"{eval_time/real_pop_size} (total = {eval_time})\n" 923 ) 924 925 else: 926 927 self.population_size = start_generation.variables.shape[0] 928 self._set_elit_count(self.population_size, self.param.elit_ratio) 929 self._set_parents_count(self.param.parents_portion) 930 931 pop = start_generation.variables 932 933 if start_generation.scores is None: 934 935 _time = time.time() 936 scores = self.set_function(pop) 937 _time = time.time() - _time 938 939 if enable_printing: 940 print( 941 f'\nFirst scores are made from gotten variables ' 942 f'(by {_time} secs, about {_time/scores.size} for each creature)\n' 943 ) 944 else: 945 scores = start_generation.scores 946 if enable_printing: 947 print(f"\nFirst scores are from gotten population\n") 948 949 # Initialization by select bests and local_descent 950 951 pop, scores = initializer_func(pop, scores) 952 953 # first sort 954 args_to_sort = scores.argsort() 955 pop = pop[args_to_sort] 956 scores = scores[args_to_sort] 957 self._update_report(scores) 958 959 self.population_size = scores.size 960 self.best_function = scores[0] 961 962 if enable_printing: 963 print( 964 f"Best score before optimization: {self.best_function}" 965 ) 966 967 t: int = 1 968 count_stagnation: int = 0 969 """iterations without progress""" 970 reason_to_stop: Optional[str] = None 971 972 # gets indexes of 2 parents to crossover 973 if studEA: 974 get_parents_inds = lambda: (0, random.randrange(1, self.parents_count)) 975 else: 976 get_parents_inds = lambda: random_indexes_pair(self.parents_count) 977 978 # while no reason to stop 979 while True: 980 981 if count_stagnation > self.max_stagnations: 982 reason_to_stop = f"limit of fails: {count_stagnation}" 983 elif t == self.max_iterations: 984 reason_to_stop = f'limit of iterations: {t}' 985 elif stop_by_val(self.best_function): 986 reason_to_stop = f"stop value reached: {self.best_function} <= {stop_when_reached}" 987 elif time_is_done(): 988 reason_to_stop = f'time is done: {time.time() - start_time} >= {time_limit_secs}' 989 990 if reason_to_stop is not None: 991 if SHOW_PROGRESS: 992 show_progress(t, self.max_iterations, f"GA is running... STOP! {reason_to_stop}") 993 break 994 995 if scores[0] < self.best_function: # if there is progress 996 count_stagnation = 0 997 self.best_function = scores[0] 998 else: 999 count_stagnation += 1 1000 1001 if SHOW_PROGRESS: 1002 show_progress( 1003 t, 1004 self.max_iterations, 1005 f"GA is running...{t} gen from {self.max_iterations}...best value = {self.best_function}" 1006 ) 1007 1008 # Select parents 1009 1010 par: array2D = np.empty((self.parents_count, self.dim)) 1011 """samples chosen to create new samples""" 1012 par_scores: array1D = np.empty(self.parents_count) 1013 1014 elit_slice = slice(None, self.elit_count) 1015 """elit parents""" 1016 1017 # copy needs because the generation will be removed after parents selection 1018 par[elit_slice] = pop[elit_slice].copy() 1019 par_scores[elit_slice] = scores[elit_slice].copy() 1020 1021 new_par_inds = ( 1022 self.selection( 1023 scores[self.elit_count:], 1024 self.parents_count - self.elit_count 1025 ).astype(np.int16) + self.elit_count 1026 ) 1027 """non-elit parents indexes""" 1028 #new_par_inds = self.selection(scores, self.parents_count - self.elit_count).astype(np.int16) 1029 par_slice = slice(self.elit_count, self.parents_count) 1030 par[par_slice] = pop[new_par_inds].copy() 1031 par_scores[par_slice] = scores[new_par_inds].copy() 1032 1033 pop = np.empty((self.population_size, self.dim)) 1034 """new generation""" 1035 scores = np.empty(self.population_size) 1036 """new generation scores""" 1037 1038 parents_slice = slice(None, self.parents_count) 1039 pop[parents_slice] = par 1040 scores[parents_slice] = par_scores 1041 1042 if self.fill_children is None: # default fill children behaviour 1043 DO_MUTATION = self.needs_mutation 1044 for k in range(self.parents_count, self.population_size, 2): 1045 1046 r1, r2 = get_parents_inds() 1047 1048 pvar1 = pop[r1] # equal to par[r1], but better for cache 1049 pvar2 = pop[r2] 1050 1051 ch1, ch2 = self.crossover(pvar1, pvar2) 1052 1053 if DO_MUTATION: 1054 ch1 = self.mut(ch1) 1055 ch2 = self.mut_middle(ch2, pvar1, pvar2) 1056 1057 pop[k] = ch1 1058 pop[k+1] = ch2 1059 else: # custom behaviour 1060 self.fill_children(pop, self.parents_count) 1061 1062 if apply_function_to_parents: 1063 scores = self.set_function(pop) 1064 else: 1065 scores[self.parents_count:] = self.set_function(pop[self.parents_count:]) 1066 1067 # remove duplicates 1068 pop, scores = remover(pop, scores, t) 1069 # revolution 1070 pop, scores = revolution(pop, scores, count_stagnation) 1071 1072 # make population better 1073 args_to_sort = scores.argsort() 1074 pop = pop[args_to_sort] 1075 scores = scores[args_to_sort] 1076 self._update_report(scores) 1077 1078 # callback it 1079 total_callback(t, self.report, pop, scores) 1080 total_middle_callback() 1081 1082 t += 1 1083 1084 self.best_function = fast_min(scores[0], self.best_function) 1085 1086 last_generation = Generation(pop, scores) 1087 self.result = GAResult(last_generation) 1088 1089 if save_last_generation_as is not None: 1090 last_generation.save(save_last_generation_as) 1091 1092 if enable_printing: 1093 show = ' ' * 200 1094 sys.stdout.write( 1095 f'\r{show}\n' 1096 f'\r The best found solution:\n {pop[0]}' 1097 f'\n\n Objective function:\n {self.best_function}\n' 1098 f'\n Used generations: {len(self.report)}' 1099 f'\n Used time: {time.time() - start_time:.3g} seconds\n' 1100 ) 1101 sys.stdout.flush() 1102 1103 if not no_plot: 1104 self.plot_results() 1105 1106 if enable_printing: 1107 if reason_to_stop is not None and 'iterations' not in reason_to_stop: 1108 sys.stdout.write( 1109 f'\nWarning: GA is terminated because of {reason_to_stop}' 1110 ) 1111 1112 return self.result 1113 1114 #endregion 1115 1116 #region PLOTTING 1117 1118 def plot_results( 1119 self, 1120 title: str = 'Genetic Algorithm', 1121 save_as: Optional[str] = None, 1122 dpi: int = 200, 1123 main_color: str = 'blue' 1124 ): 1125 """ 1126 Simple plot of self.report (if not empty) 1127 """ 1128 if len(self.report) == 0: 1129 sys.stdout.write("No results to plot!\n") 1130 return 1131 1132 plot_several_lines( 1133 lines=[self.report], 1134 colors=[main_color], 1135 labels=['best of generation'], 1136 linewidths=[2], 1137 title=title, 1138 xlabel='Generation', 1139 ylabel='Minimized function', 1140 save_as=save_as, 1141 dpi=dpi 1142 ) 1143 1144 def plot_generation_scores(self, title: str = 'Last generation scores', save_as: Optional[str] = None): 1145 """ 1146 Plots barplot of scores of last population 1147 """ 1148 1149 if not hasattr(self, 'result'): 1150 raise Exception( 1151 "There is no 'result' field into ga object! Before plotting generation u need to run seaching process" 1152 ) 1153 1154 plot_pop_scores(self.result.last_generation.scores, title, save_as) 1155 1156 #endregion 1157 1158 #region MUTATION 1159 def mut(self, x: array1D): 1160 """ 1161 just mutation 1162 """ 1163 1164 for i in self.indexes_int_mut: 1165 if random.random() < self.prob_mut_discrete: 1166 x[i] = self.discrete_mutation(x[i], *self.var_bounds[i]) 1167 1168 for i in self.indexes_float_mut: 1169 if random.random() < self.prob_mut: 1170 x[i] = self.real_mutation(x[i], *self.var_bounds[i]) 1171 1172 return x 1173 1174 def mut_middle(self, x: array1D, p1: array1D, p2: array1D): 1175 """ 1176 mutation oriented on parents 1177 """ 1178 for i in self.indexes_int_mut: 1179 1180 if random.random() < self.prob_mut_discrete: 1181 v1, v2 = p1[i], p2[i] 1182 if v1 < v2: 1183 x[i] = random.randint(v1, v2) 1184 elif v1 > v2: 1185 x[i] = random.randint(v2, v1) 1186 else: 1187 x[i] = random.randint(*self.var_bounds[i]) 1188 1189 for i in self.indexes_float_mut: 1190 if random.random() < self.prob_mut: 1191 v1, v2 = p1[i], p2[i] 1192 if v1 != v2: 1193 x[i] = random.uniform(v1, v2) 1194 else: 1195 x[i] = random.uniform(*self.var_bounds[i]) 1196 return x 1197 1198 #endregion 1199 1200 #region Set functions 1201 1202 @staticmethod 1203 def default_set_function(function_for_set: Callable[[array1D], float]): 1204 """ 1205 simple function for creating set_function 1206 function_for_set just applies to each row of population 1207 """ 1208 def func(matrix: array2D): 1209 return np.array( 1210 [function_for_set(row) for row in matrix] 1211 ) 1212 return func 1213 1214 @staticmethod 1215 def vectorized_set_function(function_for_set: Callable[[array1D], float]): 1216 """ 1217 works like default, but faster for big populations and slower for little 1218 function_for_set just applyes to each row of population 1219 """ 1220 1221 func = np.vectorize(function_for_set, signature='(n)->()') 1222 return func 1223 1224 @staticmethod 1225 def set_function_multiprocess(function_for_set: Callable[[array1D], float], n_jobs: int = -1): 1226 """ 1227 like function_for_set but uses joblib with n_jobs (-1 goes to count of available processors) 1228 """ 1229 try: 1230 from joblib import Parallel, delayed 1231 except ModuleNotFoundError: 1232 raise ModuleNotFoundError( 1233 "this additional feature requires joblib package," 1234 "run `pip install joblib` or `pip install --upgrade geneticalgorithm2[full]`" 1235 "or use another set function" 1236 ) 1237 1238 def func(matrix: array2D): 1239 result = Parallel(n_jobs=n_jobs)(delayed(function_for_set)(matrix[i]) for i in range(matrix.shape[0])) 1240 return np.array(result) 1241 return func 1242 1243 #endregion 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258
the variable type for a given or all dimension, determines the values discretion: real: double numbers int: integer number only bool: in the fact is integer with bounds [0, 1]
52class GeneticAlgorithm2: 53 """ 54 Genetic algorithm optimization process 55 """ 56 57 default_params = AlgorithmParams() 58 PROGRESS_BAR_LEN = 20 59 """max count of symbols in the progress bar""" 60 61 @property 62 def output_dict(self): 63 warnings.warn( 64 "'output_dict' is deprecated and will be removed at version 7 \n" 65 "use 'result' instead" 66 ) 67 return self.result 68 69 @property 70 def needs_mutation(self) -> bool: 71 """whether the mutation is required""" 72 return self.prob_mut > 0 or self.prob_mut_discrete > 0 73 74 #region INIT 75 76 def __init__( 77 self, 78 function: FunctionToMinimize = None, 79 80 dimension: int = 0, 81 variable_type: Union[VARIABLE_TYPE, Sequence[VARIABLE_TYPE]] = 'bool', 82 variable_boundaries: Optional[Union[array2D, Sequence[Tuple[float, float]]]] = None, 83 84 variable_type_mixed=None, 85 86 function_timeout: Optional[float] = None, 87 algorithm_parameters: Union[AlgorithmParams, Dict[str, Any]] = default_params 88 ): 89 """ 90 initializes the GA object and performs main checks 91 92 Args: 93 function: the given objective function to be minimized -- deprecated and moved to run() method 94 dimension: the number of decision variables, the population samples dimension 95 96 variable_type: string means the variable type for all variables, 97 for mixed types use sequence of strings of type for each variable 98 99 variable_boundaries: leave it None if variable_type is 'bool'; 100 otherwise provide a sequence of tuples of length two as boundaries for each variable; 101 the length of the array must be equal dimension. 102 For example, ([0,100], [0,200]) determines 103 lower boundary 0 and upper boundary 100 for first 104 and upper boundary 200 for second variable 105 and dimension must be 2. 106 107 variable_type_mixed -- deprecated 108 109 function_timeout: if the given function does not provide 110 output before function_timeout (unit is seconds) the algorithm raises error. 111 For example, when there is an infinite loop in the given function. 112 `None` means disabling -- deprecated and moved to run() 113 114 algorithm_parameters: AlgorithmParams object or usual dictionary with algorithm parameter; 115 it is not mandatory to provide all possible parameters 116 117 Notes: 118 - This implementation minimizes the given objective function. 119 For maximization u can multiply the function by -1 (for instance): the absolute 120 value of the output would be the actual objective function 121 122 for more details and examples of implementation please visit: 123 https://github.com/PasaOpasen/geneticalgorithm2 124 125 """ 126 127 # all default fields 128 129 # self.crossover: Callable[[np.ndarray, np.ndarray], Tuple[np.ndarray, np.ndarray]] = None 130 # self.real_mutation: Callable[[float, float, float], float] = None 131 # self.discrete_mutation: Callable[[int, int, int], int] = None 132 # self.selection: Callable[[np.ndarray, int], np.ndarray] = None 133 134 self.result = None 135 self.best_function = None 136 137 self.revolution_oppositor = None 138 self.dup_oppositor = None 139 self.creator = None 140 self.init_oppositors = None 141 142 self.f: Callable[[array1D], float] = None 143 self.funtimeout: float = None 144 145 self.set_function: Callable[[np.ndarray], np.ndarray] = None 146 147 # self.dim: int = None 148 self.var_bounds: List[Tuple[Union[int, float], Union[int, float]]] = None 149 # self.indexes_int: np.ndarray = None 150 # self.indexes_float: np.ndarray = None 151 152 self.checked_reports: List[Tuple[str, Callable[[array1D], None]]] = None 153 154 self.population_size: int = None 155 self.progress_stream = None 156 157 # input algorithm's parameters 158 159 assert isinstance(algorithm_parameters, (dict, AlgorithmParams)), ( 160 "algorithm_parameters must be dict or AlgorithmParams object" 161 ) 162 if not isinstance(algorithm_parameters, AlgorithmParams): 163 algorithm_parameters = AlgorithmParams.from_dict(algorithm_parameters) 164 algorithm_parameters.validate() 165 self.param = algorithm_parameters 166 167 self.crossover, self.real_mutation, self.discrete_mutation, self.selection = algorithm_parameters.get_CMS_funcs() 168 169 # dimension and area bounds 170 self.dim = int(dimension) 171 assert self.dim > 0, f"dimension count must be int and >0, gotten {dimension}" 172 173 if variable_type_mixed is not None: 174 warnings.warn( 175 f"argument variable_type_mixed is deprecated and will be removed at version 7\n " 176 f"use variable_type={tuple(variable_type_mixed)} instead" 177 ) 178 variable_type = variable_type_mixed 179 self._set_types_indexes(variable_type) # types indexes 180 self._set_var_boundaries(variable_type, variable_boundaries) # input variables' boundaries 181 182 # fix mutation probs 183 184 assert can_be_prob(self.param.mutation_probability) 185 self.prob_mut = self.param.mutation_probability 186 assert self.param.mutation_discrete_probability is None or can_be_prob(self.param.mutation_discrete_probability) 187 self.prob_mut_discrete = self.param.mutation_discrete_probability or self.prob_mut 188 189 if self.param.crossover_probability is not None: 190 warnings.warn( 191 f"crossover_probability is deprecated and will be removed in version 7. " 192 f"Reason: it's old and has no sense" 193 ) 194 195 ############################################################# 196 # input function 197 if function: 198 warnings.warn( 199 f"function is deprecated in init constructor and will be removed in version 7. " 200 f"Move this argument to run() method" 201 ) 202 self._check_function(function) 203 if function_timeout: 204 warnings.warn( 205 f"function_timeout is deprecated in init constructor and will be removed in version 7. " 206 f"Move this argument to run() method" 207 ) 208 self._check_function_timeout(function_timeout) 209 210 ############################################################# 211 212 self.population_size = int(self.param.population_size) 213 self._set_parents_count(self.param.parents_portion) 214 self._set_elit_count(self.population_size, self.param.elit_ratio) 215 assert self.parents_count >= self.elit_count, ( 216 f"\n number of parents ({self.parents_count}) " 217 f"must be greater than number of elits ({self.elit_count})" 218 ) 219 220 self._set_max_iterations() 221 222 self._set_report() 223 224 # specify this function to speed up or change default behaviour 225 self.fill_children: Optional[Callable[[array2D, int], None]] = None 226 """ 227 custom function which adds children for population POP 228 where POP[:parents_count] are parents lines and next lines are for children 229 """ 230 231 def _set_types_indexes(self, variable_type: Union[str, Sequence[str]]): 232 233 indexes = np.arange(self.dim) 234 self.indexes_int = np.array([]) 235 self.indexes_float = np.array([]) 236 237 assert_message = ( 238 f"\n variable_type must be 'bool', 'int', 'real' or a sequence with 'int' and 'real', got {variable_type}" 239 ) 240 241 if isinstance(variable_type, str): 242 assert (variable_type in VARIABLE_TYPE.__args__), assert_message 243 if variable_type == 'real': 244 self.indexes_float = indexes 245 else: 246 self.indexes_int = indexes 247 248 else: # sequence case 249 250 assert len(variable_type) == self.dim, ( 251 f"\n variable_type must have a length equal dimension. " 252 f"Should be {self.dim}, got {len(variable_type)}" 253 ) 254 assert 'bool' not in variable_type, ( 255 "don't use 'bool' if variable_type is a sequence, " 256 "for 'boolean' case use 'int' and specify boundary as (0,1)" 257 ) 258 assert all(v in VARIABLE_TYPE.__args__ for v in variable_type), assert_message 259 260 vartypes = np.array(variable_type) 261 self.indexes_int = indexes[vartypes == 'int'] 262 self.indexes_float = indexes[vartypes == 'real'] 263 264 def _set_var_boundaries( 265 self, 266 variable_type: Union[str, Sequence[str]], 267 variable_boundaries 268 ): 269 if isinstance(variable_type, str) and variable_type == 'bool': 270 self.var_bounds = [(0, 1)] * self.dim 271 else: 272 273 if is_numpy(variable_boundaries): 274 assert variable_boundaries.shape == (self.dim, 2), ( 275 f"\n if variable_boundaries is numpy array, it must be with shape (dim, 2)" 276 ) 277 else: 278 assert len(variable_boundaries) == self.dim and all((len(t) == 2 for t in variable_boundaries)), ( 279 "\n if variable_boundaries is sequence, " 280 "it must be with len dim and boundary for each variable must be a tuple of length two" 281 ) 282 283 for i in variable_boundaries: 284 assert i[0] <= i[1], "\n lower_boundaries must be smaller than upper_boundaries [lower,upper]" 285 286 self.var_bounds = [(i[0], i[1]) for i in variable_boundaries] 287 288 def _set_parents_count(self, parents_portion: float): 289 290 self.parents_count = int(parents_portion * self.population_size) 291 assert self.population_size >= self.parents_count > 1, ( 292 f'parents count {self.parents_count} cannot be less than population size {self.population_size}' 293 ) 294 trl = self.population_size - self.parents_count 295 if trl % 2 != 0: 296 self.parents_count += 1 297 298 def _set_elit_count(self, pop_size: int, elit_ratio: Union[float, int]): 299 300 assert elit_ratio >= 0 301 self.elit_count = elit_ratio if isinstance(elit_ratio, str) else math.ceil(pop_size*elit_ratio) 302 303 def _set_max_iterations(self): 304 305 if self.param.max_num_iteration is None: 306 iterate = 0 307 for i in range(0, self.dim): 308 bound_min, bound_max = self.var_bounds[i] 309 var_space = bound_max - bound_min 310 if i in self.indexes_int: 311 iterate += var_space * self.dim * (100 / self.population_size) 312 else: 313 iterate += var_space * 50 * (100 / self.population_size) 314 iterate = int(iterate) 315 if (iterate * self.population_size) > 10000000: 316 iterate = 10000000 / self.population_size 317 318 self.max_iterations = fast_min(iterate, 8000) 319 else: 320 assert self.param.max_num_iteration > 0 321 self.max_iterations = math.ceil(self.param.max_num_iteration) 322 323 max_it = self.param.max_iteration_without_improv 324 if max_it is None: 325 self.max_stagnations = self.max_iterations + 1 326 else: 327 self.max_stagnations = math.ceil(max_it) 328 329 def _check_function(self, function: Callable[[array1D], float]): 330 assert callable(function), "function must be callable!" 331 self.f = function 332 333 def _check_function_timeout(self, function_timeout: Optional[float]): 334 335 if function_timeout is not None and function_timeout > 0: 336 try: 337 from func_timeout import func_timeout, FunctionTimedOut 338 except ModuleNotFoundError: 339 raise ModuleNotFoundError( 340 "function_timeout > 0 needs additional package func_timeout\n" 341 "run `python -m pip install func_timeout`\n" 342 "or disable this parameter: function_timeout=None" 343 ) 344 345 self.funtimeout = None if function_timeout is None else float(function_timeout) 346 347 #endregion 348 349 #region REPORT 350 351 def _set_report(self): 352 """ 353 creates default report checker 354 """ 355 self.checked_reports = [ 356 # item 0 cuz scores will be sorted and min item is items[0] 357 ('report', operator.itemgetter(0)) 358 ] 359 360 def _clear_report(self): 361 """ 362 removes all report objects 363 """ 364 fields = [f for f in vars(self).keys() if f.startswith('report')] 365 for attr in fields: 366 delattr(self, attr) 367 368 def _init_report(self): 369 """ 370 makes empty report fields 371 """ 372 for name, _ in self.checked_reports: 373 setattr(self, name, []) 374 375 def _update_report(self, scores: array1D): 376 """ 377 append report value to the end of field 378 """ 379 for name, func in self.checked_reports: 380 getattr(self, name).append( 381 func(scores) 382 ) 383 384 #endregion 385 386 #region RUN METHODS 387 388 def _progress(self, count: int, total: int, status: str = ''): 389 390 part = count / total 391 392 filled_len = round(GeneticAlgorithm2.PROGRESS_BAR_LEN * part) 393 percents = round(100.0 * part, 1) 394 bar = '|' * filled_len + '_' * (GeneticAlgorithm2.PROGRESS_BAR_LEN - filled_len) 395 396 self.progress_stream.write('\r%s %s%s %s' % (bar, percents, '%', status)) 397 self.progress_stream.flush() 398 399 def __str__(self): 400 return f"Genetic algorithm object with parameters {self.param}" 401 402 def __repr__(self): 403 return self.__str__() 404 405 def _simulate(self, sample: array1D): 406 407 from func_timeout import func_timeout, FunctionTimedOut 408 409 obj = None 410 eval_time = time.time() 411 try: 412 obj = func_timeout( 413 self.funtimeout, 414 lambda: self.f(sample) 415 ) 416 except FunctionTimedOut: 417 print("given function is not applicable") 418 eval_time = time.time() - eval_time 419 420 assert obj is not None, ( 421 f"the given function was running like {eval_time} seconds and does not provide any output" 422 ) 423 424 tp = type(obj) 425 assert ( 426 tp in (int, float) or np.issubdtype(tp, np.floating) or np.issubdtype(tp, np.integer) 427 ), f"Minimized function should return a number, but got '{obj}' object with type {tp}" 428 429 return obj, eval_time 430 431 def _set_mutation_indexes(self, mutation_indexes: Optional[Iterable[int]]): 432 433 if mutation_indexes is None: 434 self.indexes_float_mut = self.indexes_float 435 self.indexes_int_mut = self.indexes_int 436 else: 437 tmp_indexes = set(mutation_indexes) 438 self.indexes_int_mut = np.array(list(tmp_indexes.intersection(self.indexes_int))) 439 self.indexes_float_mut = np.array(list(tmp_indexes.intersection(self.indexes_float))) 440 441 if self.indexes_float_mut.size == 0 and self.indexes_int_mut.size == 0: 442 warnings.warn(f"No mutation dimensions!!! Check ur mutation indexes!!") 443 444 #@profile 445 def run( 446 self, 447 no_plot: bool = False, 448 disable_printing: bool = False, 449 progress_bar_stream: Optional[str] = 'stdout', 450 451 # deprecated 452 disable_progress_bar: bool = False, 453 454 function: FunctionToMinimize = None, 455 function_timeout: Optional[float] = None, 456 457 set_function: SetFunctionToMinimize = None, 458 apply_function_to_parents: bool = False, 459 start_generation: GenerationConvertible = Generation(), 460 studEA: bool = False, 461 mutation_indexes: Optional[Iterable[int]] = None, 462 463 init_creator: Optional[CreatorFunc] = None, 464 init_oppositors: Optional[Sequence[OppositorFunc]] = None, 465 466 duplicates_oppositor: Optional[OppositorFunc] = None, 467 remove_duplicates_generation_step: Optional[int] = None, 468 469 revolution_oppositor: Optional[OppositorFunc] = None, 470 revolution_after_stagnation_step: Optional[int] = None, 471 revolution_part: float = 0.3, 472 473 population_initializer: Tuple[ 474 int, PopulationModifier 475 ] = get_population_initializer(select_best_of=1, local_optimization_step='never', local_optimizer=None), 476 477 stop_when_reached: Optional[float] = None, 478 callbacks: Optional[Sequence[SimpleCallbackFunc]] = None, 479 middle_callbacks: Optional[Sequence[MiddleCallbackFunc]] = None, #+ 480 time_limit_secs: Optional[float] = None, 481 save_last_generation_as: Optional[str] = None, 482 seed: Optional[int] = None 483 ) -> GAResult: 484 """ 485 runs optimization process 486 487 Args: 488 no_plot: do not plot results using matplotlib by default 489 490 disable_printing: do not print log info of optimization process (except progress bar) 491 492 progress_bar_stream: 'stdout', 'stderr' or None to disable progress bar; 493 disabling progress bar can speed up the optimization process in many cases 494 495 disable_progress_bar: deprecated 496 497 function: the given objective function (sample -> its score) to be minimized; 498 499 function_timeout: if the given function does not provide 500 output before function_timeout (unit is seconds) the algorithm raises error. 501 For example, when there is an infinite loop in the given function. 502 `None` means disabling 503 504 set_function: set function (all samples -> score per sample) to be used instead of usual function 505 (usually for optimization purposes) 506 507 apply_function_to_parents: whether to apply function to parents from previous generation (if it's needed), 508 it can be needed at working with games agents, but for other tasks will just waste time 509 510 start_generation: initial generation object of any `GenerationConvertible` type 511 512 studEA: using stud EA strategy (crossover with best object always) 513 514 mutation_indexes: indexes of dimensions where mutation can be performed (all dimensions by default) 515 516 init_creator: the function creates population samples. 517 By default -- random uniform for real variables and random uniform for int 518 init_oppositors: the list of oppositors creates oppositions for base population. No by default 519 520 duplicates_oppositor: oppositor for applying after duplicates removing. 521 By default -- using just random initializer from creator 522 remove_duplicates_generation_step: step for removing duplicates (have a sense with discrete tasks). 523 No by default 524 525 revolution_oppositor: oppositor for revolution time. No by default 526 revolution_after_stagnation_step: create revolution after this generations of stagnation. No by default 527 revolution_part: float, the part of generation to being oppose. By default is 0.3 528 529 population_initializer: object for actions at population initialization step 530 to create better start population. See doc 531 532 stop_when_reached: stop searching after reaching this value 533 (it can be potential minimum or something else) 534 535 callbacks: sequence of callback functions with structure: 536 (generation_number, report_list, last_population, last_scores) -> do some action 537 538 middle_callbacks: sequence of functions made by `MiddleCallback` class 539 540 time_limit_secs: limit time of working (in seconds); 541 `None` means no time limit (limit will be only for count of generation and so on) 542 543 save_last_generation_as: path to .npz file 544 for saving last_generation as numpy dictionary like 545 {'population': 2D-array, 'scores': 1D-array}; 546 None disables this option 547 548 seed: random seed (None if doesn't matter) 549 550 Returns: 551 `GAResult` object; 552 also fills the self.report and self.result with many report information 553 554 Notes: 555 - if `function_timeout` is enabled then `function` must be set 556 - it would be more logical to use params like `studEA` as an algorithm parameter, 557 but `run()`-way can be more convenient for real using 558 """ 559 560 if disable_progress_bar: 561 warnings.warn( 562 f"disable_progress_bar is deprecated and will be removed in version 7, " 563 f"use probress_bar_stream=None to disable progress bar" 564 ) 565 progress_bar_stream = None 566 567 enable_printing: bool = not disable_printing 568 569 start_generation = Generation.from_object(self.dim, start_generation) 570 571 assert is_current_gen_number(revolution_after_stagnation_step), "must be None or int and >0" 572 assert is_current_gen_number(remove_duplicates_generation_step), "must be None or int and >0" 573 assert can_be_prob(revolution_part), f"revolution_part must be in [0,1], not {revolution_part}" 574 assert stop_when_reached is None or isinstance(stop_when_reached, (int, float)) 575 assert isinstance(callbacks, collections.abc.Sequence) or callbacks is None, ( 576 "callbacks should be a list of callbacks functions" 577 ) 578 assert isinstance(middle_callbacks, collections.abc.Sequence) or middle_callbacks is None, ( 579 "middle_callbacks should be list of MiddleCallbacks functions" 580 ) 581 assert time_limit_secs is None or time_limit_secs > 0, 'time_limit_secs must be None of number > 0' 582 583 self._set_mutation_indexes(mutation_indexes) 584 from OppOpPopInit import set_seed 585 set_seed(seed) 586 587 # randomstate = np.random.default_rng(random.randint(0, 1000) if seed is None else seed) 588 # self.randomstate = randomstate 589 590 # using bool flag is faster than using empty function with generated string arguments 591 SHOW_PROGRESS = progress_bar_stream is not None 592 if SHOW_PROGRESS: 593 594 show_progress = lambda t, t2, s: self._progress(t, t2, status=s) 595 596 if progress_bar_stream == 'stdout': 597 self.progress_stream = sys.stdout 598 elif progress_bar_stream == 'stderr': 599 self.progress_stream = sys.stderr 600 else: 601 raise Exception( 602 f"wrong value {progress_bar_stream} of progress_bar_stream, must be 'stdout'/'stderr'/None" 603 ) 604 else: 605 show_progress = None 606 607 stop_by_val = ( 608 (lambda best_f: False) 609 if stop_when_reached is None 610 else (lambda best_f: best_f <= stop_when_reached) 611 ) 612 613 t: int = 0 614 count_stagnation: int = 0 615 pop: array2D = None 616 scores: array1D = None 617 618 #region CALLBACKS 619 620 def get_data(): 621 """ 622 returns all important data about model 623 """ 624 return MiddleCallbackData( 625 last_generation=Generation(pop, scores), 626 current_generation=t, 627 report_list=self.report, 628 629 mutation_prob=self.prob_mut, 630 mutation_discrete_prob=self.prob_mut_discrete, 631 mutation=self.real_mutation, 632 mutation_discrete=self.discrete_mutation, 633 crossover=self.crossover, 634 selection=self.selection, 635 636 current_stagnation=count_stagnation, 637 max_stagnation=self.max_stagnations, 638 639 parents_portion=self.param.parents_portion, 640 elit_ratio=self.param.elit_ratio, 641 642 set_function=self.set_function, 643 644 reason_to_stop=reason_to_stop 645 ) 646 647 def set_data(data: MiddleCallbackData): 648 """ 649 sets data to model 650 """ 651 nonlocal pop, scores, count_stagnation, reason_to_stop 652 653 pop, scores = data.last_generation.variables, data.last_generation.scores 654 self.population_size = pop.shape[0] 655 656 self.param.parents_portion = data.parents_portion 657 self._set_parents_count(data.parents_portion) 658 659 self.param.elit_ratio = data.elit_ratio 660 self._set_elit_count(self.population_size, data.elit_ratio) 661 662 self.prob_mut = data.mutation_prob 663 self.prob_mut_discrete = data.mutation_discrete_prob 664 self.real_mutation = data.mutation 665 self.discrete_mutation = data.mutation_discrete 666 self.crossover = data.crossover 667 self.selection = data.selection 668 669 count_stagnation = data.current_stagnation 670 reason_to_stop = data.reason_to_stop 671 self.max_stagnations = data.max_stagnation 672 673 self.set_function = data.set_function 674 675 if not callbacks: 676 total_callback = lambda g, r, lp, ls: None 677 else: 678 def total_callback( 679 generation_number: int, 680 report_list: List[float], 681 last_population: array2D, 682 last_scores: array1D 683 ): 684 for cb in callbacks: 685 cb(generation_number, report_list, last_population, last_scores) 686 687 if not middle_callbacks: 688 total_middle_callback = lambda: None 689 else: 690 def total_middle_callback(): 691 """ 692 applies callbacks and sets new data if there is a sense 693 """ 694 data = get_data() 695 flag = False 696 for cb in middle_callbacks: 697 data, has_sense = cb(data) 698 if has_sense: 699 flag = True 700 if flag: 701 set_data(data) # update global date if there was real callback step 702 703 #endregion 704 705 start_time = time.time() 706 time_is_done = ( 707 (lambda: False) 708 if time_limit_secs is None 709 else (lambda: int(time.time() - start_time) >= time_limit_secs) 710 ) 711 712 # combine with deprecated parts 713 function = function or self.f 714 function_timeout = function_timeout or self.funtimeout 715 716 assert function or set_function, 'no function to minimize' 717 if function: 718 self._check_function(function) 719 if function_timeout: 720 self._check_function_timeout(function_timeout) 721 722 self.set_function = set_function or GeneticAlgorithm2.default_set_function(self.f) 723 724 #region Initial population, duplicates filter, revolutionary 725 726 pop_coef, initializer_func = population_initializer 727 728 # population creator by random or with oppositions 729 if init_creator is None: 730 731 from OppOpPopInit import SampleInitializers 732 733 # just uniform random 734 self.creator = SampleInitializers.Combined( 735 minimums=[v[0] for v in self.var_bounds], 736 maximums=[v[1] for v in self.var_bounds], 737 indexes=( 738 self.indexes_int, 739 self.indexes_float 740 ), 741 creator_initializers=( 742 SampleInitializers.RandomInteger, 743 SampleInitializers.Uniform 744 ) 745 ) 746 else: 747 assert callable(init_creator) 748 self.creator = init_creator 749 750 self.init_oppositors = init_oppositors 751 self.dup_oppositor = duplicates_oppositor 752 self.revolution_oppositor = revolution_oppositor 753 754 # event for removing duplicates 755 if remove_duplicates_generation_step is None: 756 def remover(pop: array2D, scores: array1D, gen: int) -> Tuple[ 757 array2D, 758 array1D 759 ]: 760 return pop, scores 761 else: 762 763 def without_dup(pop: array2D, scores: array1D): # returns population without dups 764 _, index_of_dups = np.unique(pop, axis=0, return_index=True) 765 return pop[index_of_dups], scores[index_of_dups], scores.size - index_of_dups.size 766 767 if self.dup_oppositor is None: # if there is no dup_oppositor, use random creator 768 def remover(pop: array2D, scores: array1D, gen: int) -> Tuple[ 769 array2D, 770 array1D 771 ]: 772 if gen % remove_duplicates_generation_step != 0: 773 return pop, scores 774 775 pp, sc, count_to_create = without_dup(pop, scores) # pop without dups 776 777 if count_to_create == 0: 778 if SHOW_PROGRESS: 779 show_progress(t, self.max_iterations, 780 f"GA is running...{t} gen from {self.max_iterations}. No dups!") 781 return pop, scores 782 783 pp2 = SampleInitializers.CreateSamples(self.creator, count_to_create) # new pop elements 784 pp2_scores = self.set_function(pp2) # new elements values 785 786 if SHOW_PROGRESS: 787 show_progress(t, self.max_iterations, 788 f"GA is running...{t} gen from {self.max_iterations}. Kill dups!") 789 790 new_pop = np.vstack((pp, pp2)) 791 new_scores = np.concatenate((sc, pp2_scores)) 792 793 args_to_sort = new_scores.argsort() 794 return new_pop[args_to_sort], new_scores[args_to_sort] 795 796 else: # using oppositors 797 assert callable(self.dup_oppositor) 798 799 def remover(pop: np.ndarray, scores: np.ndarray, gen: int) -> Tuple[ 800 np.ndarray, 801 np.ndarray 802 ]: 803 if gen % remove_duplicates_generation_step != 0: 804 return pop, scores 805 806 pp, sc, count_to_create = without_dup(pop, scores) # pop without dups 807 808 if count_to_create == 0: 809 if SHOW_PROGRESS: 810 show_progress(t, self.max_iterations, 811 f"GA is running...{t} gen from {self.max_iterations}. No dups!") 812 return pop, scores 813 814 if count_to_create > sc.size: 815 raise Exception( 816 f"Too many duplicates at generation {gen} ({count_to_create} > {sc.size}), cannot oppose" 817 ) 818 819 # oppose count_to_create worse elements 820 pp2 = OppositionOperators.Reflect(pp[-count_to_create:], self.dup_oppositor) # new pop elements 821 pp2_scores = self.set_function(pp2) # new elements values 822 823 if SHOW_PROGRESS: 824 show_progress(t, self.max_iterations, 825 f"GA is running...{t} gen from {self.max_iterations}. Kill dups!") 826 827 new_pop = np.vstack((pp, pp2)) 828 new_scores = np.concatenate((sc, pp2_scores)) 829 830 args_to_sort = new_scores.argsort() 831 return new_pop[args_to_sort], new_scores[args_to_sort] 832 833 # event for revolution 834 if revolution_after_stagnation_step is None: 835 def revolution(pop: array2D, scores: array1D, stagnation_count: int) -> Tuple[ 836 array2D, 837 array1D 838 ]: 839 return pop, scores 840 else: 841 if revolution_oppositor is None: 842 raise Exception( 843 f"How can I make revolution each {revolution_after_stagnation_step} stagnation steps " 844 f"if revolution_oppositor is None (not defined)?" 845 ) 846 assert callable(revolution_oppositor) 847 848 from OppOpPopInit import OppositionOperators 849 850 def revolution(pop: array2D, scores: array1D, stagnation_count: int) -> Tuple[ 851 array2D, 852 array1D 853 ]: 854 if stagnation_count < revolution_after_stagnation_step: 855 return pop, scores 856 part = int(pop.shape[0]*revolution_part) 857 858 pp2 = OppositionOperators.Reflect(pop[-part:], self.revolution_oppositor) 859 pp2_scores = self.set_function(pp2) 860 861 combined = np.vstack((pop, pp2)) 862 combined_scores = np.concatenate((scores, pp2_scores)) 863 args = combined_scores.argsort() 864 865 if SHOW_PROGRESS: 866 show_progress(t, self.max_iterations, 867 f"GA is running...{t} gen from {self.max_iterations}. Revolution!") 868 869 args = args[:scores.size] 870 return combined[args], combined_scores[args] 871 872 #enregion 873 874 # 875 # 876 # START ALGORITHM LOGIC 877 # 878 # 879 880 # Report 881 self._clear_report() # clear old report objects 882 self._init_report() 883 884 # initialization of pop 885 886 if start_generation.variables is None: 887 888 from OppOpPopInit import init_population 889 890 real_pop_size = self.population_size * pop_coef 891 892 # pop = np.empty((real_pop_size, self.dim)) 893 scores = np.empty(real_pop_size) 894 895 pop = init_population( 896 samples_count=real_pop_size, 897 creator=self.creator, 898 oppositors=self.init_oppositors 899 ) 900 901 if self.funtimeout and self.funtimeout > 0: # perform simulation 902 903 time_counter = 0 904 905 for p in range(0, real_pop_size): 906 # simulation returns exception or func value -- check the time of evaluating 907 value, eval_time = self._simulate(pop[p]) 908 scores[p] = value 909 time_counter += eval_time 910 911 if enable_printing: 912 print( 913 f"\nSim: Average time of function evaluating (secs): " 914 f"{time_counter/real_pop_size} (total = {time_counter})\n" 915 ) 916 else: 917 918 eval_time = time.time() 919 scores = self.set_function(pop) 920 eval_time = time.time() - eval_time 921 if enable_printing: 922 print( 923 f"\nSet: Average time of function evaluating (secs): " 924 f"{eval_time/real_pop_size} (total = {eval_time})\n" 925 ) 926 927 else: 928 929 self.population_size = start_generation.variables.shape[0] 930 self._set_elit_count(self.population_size, self.param.elit_ratio) 931 self._set_parents_count(self.param.parents_portion) 932 933 pop = start_generation.variables 934 935 if start_generation.scores is None: 936 937 _time = time.time() 938 scores = self.set_function(pop) 939 _time = time.time() - _time 940 941 if enable_printing: 942 print( 943 f'\nFirst scores are made from gotten variables ' 944 f'(by {_time} secs, about {_time/scores.size} for each creature)\n' 945 ) 946 else: 947 scores = start_generation.scores 948 if enable_printing: 949 print(f"\nFirst scores are from gotten population\n") 950 951 # Initialization by select bests and local_descent 952 953 pop, scores = initializer_func(pop, scores) 954 955 # first sort 956 args_to_sort = scores.argsort() 957 pop = pop[args_to_sort] 958 scores = scores[args_to_sort] 959 self._update_report(scores) 960 961 self.population_size = scores.size 962 self.best_function = scores[0] 963 964 if enable_printing: 965 print( 966 f"Best score before optimization: {self.best_function}" 967 ) 968 969 t: int = 1 970 count_stagnation: int = 0 971 """iterations without progress""" 972 reason_to_stop: Optional[str] = None 973 974 # gets indexes of 2 parents to crossover 975 if studEA: 976 get_parents_inds = lambda: (0, random.randrange(1, self.parents_count)) 977 else: 978 get_parents_inds = lambda: random_indexes_pair(self.parents_count) 979 980 # while no reason to stop 981 while True: 982 983 if count_stagnation > self.max_stagnations: 984 reason_to_stop = f"limit of fails: {count_stagnation}" 985 elif t == self.max_iterations: 986 reason_to_stop = f'limit of iterations: {t}' 987 elif stop_by_val(self.best_function): 988 reason_to_stop = f"stop value reached: {self.best_function} <= {stop_when_reached}" 989 elif time_is_done(): 990 reason_to_stop = f'time is done: {time.time() - start_time} >= {time_limit_secs}' 991 992 if reason_to_stop is not None: 993 if SHOW_PROGRESS: 994 show_progress(t, self.max_iterations, f"GA is running... STOP! {reason_to_stop}") 995 break 996 997 if scores[0] < self.best_function: # if there is progress 998 count_stagnation = 0 999 self.best_function = scores[0] 1000 else: 1001 count_stagnation += 1 1002 1003 if SHOW_PROGRESS: 1004 show_progress( 1005 t, 1006 self.max_iterations, 1007 f"GA is running...{t} gen from {self.max_iterations}...best value = {self.best_function}" 1008 ) 1009 1010 # Select parents 1011 1012 par: array2D = np.empty((self.parents_count, self.dim)) 1013 """samples chosen to create new samples""" 1014 par_scores: array1D = np.empty(self.parents_count) 1015 1016 elit_slice = slice(None, self.elit_count) 1017 """elit parents""" 1018 1019 # copy needs because the generation will be removed after parents selection 1020 par[elit_slice] = pop[elit_slice].copy() 1021 par_scores[elit_slice] = scores[elit_slice].copy() 1022 1023 new_par_inds = ( 1024 self.selection( 1025 scores[self.elit_count:], 1026 self.parents_count - self.elit_count 1027 ).astype(np.int16) + self.elit_count 1028 ) 1029 """non-elit parents indexes""" 1030 #new_par_inds = self.selection(scores, self.parents_count - self.elit_count).astype(np.int16) 1031 par_slice = slice(self.elit_count, self.parents_count) 1032 par[par_slice] = pop[new_par_inds].copy() 1033 par_scores[par_slice] = scores[new_par_inds].copy() 1034 1035 pop = np.empty((self.population_size, self.dim)) 1036 """new generation""" 1037 scores = np.empty(self.population_size) 1038 """new generation scores""" 1039 1040 parents_slice = slice(None, self.parents_count) 1041 pop[parents_slice] = par 1042 scores[parents_slice] = par_scores 1043 1044 if self.fill_children is None: # default fill children behaviour 1045 DO_MUTATION = self.needs_mutation 1046 for k in range(self.parents_count, self.population_size, 2): 1047 1048 r1, r2 = get_parents_inds() 1049 1050 pvar1 = pop[r1] # equal to par[r1], but better for cache 1051 pvar2 = pop[r2] 1052 1053 ch1, ch2 = self.crossover(pvar1, pvar2) 1054 1055 if DO_MUTATION: 1056 ch1 = self.mut(ch1) 1057 ch2 = self.mut_middle(ch2, pvar1, pvar2) 1058 1059 pop[k] = ch1 1060 pop[k+1] = ch2 1061 else: # custom behaviour 1062 self.fill_children(pop, self.parents_count) 1063 1064 if apply_function_to_parents: 1065 scores = self.set_function(pop) 1066 else: 1067 scores[self.parents_count:] = self.set_function(pop[self.parents_count:]) 1068 1069 # remove duplicates 1070 pop, scores = remover(pop, scores, t) 1071 # revolution 1072 pop, scores = revolution(pop, scores, count_stagnation) 1073 1074 # make population better 1075 args_to_sort = scores.argsort() 1076 pop = pop[args_to_sort] 1077 scores = scores[args_to_sort] 1078 self._update_report(scores) 1079 1080 # callback it 1081 total_callback(t, self.report, pop, scores) 1082 total_middle_callback() 1083 1084 t += 1 1085 1086 self.best_function = fast_min(scores[0], self.best_function) 1087 1088 last_generation = Generation(pop, scores) 1089 self.result = GAResult(last_generation) 1090 1091 if save_last_generation_as is not None: 1092 last_generation.save(save_last_generation_as) 1093 1094 if enable_printing: 1095 show = ' ' * 200 1096 sys.stdout.write( 1097 f'\r{show}\n' 1098 f'\r The best found solution:\n {pop[0]}' 1099 f'\n\n Objective function:\n {self.best_function}\n' 1100 f'\n Used generations: {len(self.report)}' 1101 f'\n Used time: {time.time() - start_time:.3g} seconds\n' 1102 ) 1103 sys.stdout.flush() 1104 1105 if not no_plot: 1106 self.plot_results() 1107 1108 if enable_printing: 1109 if reason_to_stop is not None and 'iterations' not in reason_to_stop: 1110 sys.stdout.write( 1111 f'\nWarning: GA is terminated because of {reason_to_stop}' 1112 ) 1113 1114 return self.result 1115 1116 #endregion 1117 1118 #region PLOTTING 1119 1120 def plot_results( 1121 self, 1122 title: str = 'Genetic Algorithm', 1123 save_as: Optional[str] = None, 1124 dpi: int = 200, 1125 main_color: str = 'blue' 1126 ): 1127 """ 1128 Simple plot of self.report (if not empty) 1129 """ 1130 if len(self.report) == 0: 1131 sys.stdout.write("No results to plot!\n") 1132 return 1133 1134 plot_several_lines( 1135 lines=[self.report], 1136 colors=[main_color], 1137 labels=['best of generation'], 1138 linewidths=[2], 1139 title=title, 1140 xlabel='Generation', 1141 ylabel='Minimized function', 1142 save_as=save_as, 1143 dpi=dpi 1144 ) 1145 1146 def plot_generation_scores(self, title: str = 'Last generation scores', save_as: Optional[str] = None): 1147 """ 1148 Plots barplot of scores of last population 1149 """ 1150 1151 if not hasattr(self, 'result'): 1152 raise Exception( 1153 "There is no 'result' field into ga object! Before plotting generation u need to run seaching process" 1154 ) 1155 1156 plot_pop_scores(self.result.last_generation.scores, title, save_as) 1157 1158 #endregion 1159 1160 #region MUTATION 1161 def mut(self, x: array1D): 1162 """ 1163 just mutation 1164 """ 1165 1166 for i in self.indexes_int_mut: 1167 if random.random() < self.prob_mut_discrete: 1168 x[i] = self.discrete_mutation(x[i], *self.var_bounds[i]) 1169 1170 for i in self.indexes_float_mut: 1171 if random.random() < self.prob_mut: 1172 x[i] = self.real_mutation(x[i], *self.var_bounds[i]) 1173 1174 return x 1175 1176 def mut_middle(self, x: array1D, p1: array1D, p2: array1D): 1177 """ 1178 mutation oriented on parents 1179 """ 1180 for i in self.indexes_int_mut: 1181 1182 if random.random() < self.prob_mut_discrete: 1183 v1, v2 = p1[i], p2[i] 1184 if v1 < v2: 1185 x[i] = random.randint(v1, v2) 1186 elif v1 > v2: 1187 x[i] = random.randint(v2, v1) 1188 else: 1189 x[i] = random.randint(*self.var_bounds[i]) 1190 1191 for i in self.indexes_float_mut: 1192 if random.random() < self.prob_mut: 1193 v1, v2 = p1[i], p2[i] 1194 if v1 != v2: 1195 x[i] = random.uniform(v1, v2) 1196 else: 1197 x[i] = random.uniform(*self.var_bounds[i]) 1198 return x 1199 1200 #endregion 1201 1202 #region Set functions 1203 1204 @staticmethod 1205 def default_set_function(function_for_set: Callable[[array1D], float]): 1206 """ 1207 simple function for creating set_function 1208 function_for_set just applies to each row of population 1209 """ 1210 def func(matrix: array2D): 1211 return np.array( 1212 [function_for_set(row) for row in matrix] 1213 ) 1214 return func 1215 1216 @staticmethod 1217 def vectorized_set_function(function_for_set: Callable[[array1D], float]): 1218 """ 1219 works like default, but faster for big populations and slower for little 1220 function_for_set just applyes to each row of population 1221 """ 1222 1223 func = np.vectorize(function_for_set, signature='(n)->()') 1224 return func 1225 1226 @staticmethod 1227 def set_function_multiprocess(function_for_set: Callable[[array1D], float], n_jobs: int = -1): 1228 """ 1229 like function_for_set but uses joblib with n_jobs (-1 goes to count of available processors) 1230 """ 1231 try: 1232 from joblib import Parallel, delayed 1233 except ModuleNotFoundError: 1234 raise ModuleNotFoundError( 1235 "this additional feature requires joblib package," 1236 "run `pip install joblib` or `pip install --upgrade geneticalgorithm2[full]`" 1237 "or use another set function" 1238 ) 1239 1240 def func(matrix: array2D): 1241 result = Parallel(n_jobs=n_jobs)(delayed(function_for_set)(matrix[i]) for i in range(matrix.shape[0])) 1242 return np.array(result) 1243 return func 1244 1245 #endregion
Genetic algorithm optimization process
76 def __init__( 77 self, 78 function: FunctionToMinimize = None, 79 80 dimension: int = 0, 81 variable_type: Union[VARIABLE_TYPE, Sequence[VARIABLE_TYPE]] = 'bool', 82 variable_boundaries: Optional[Union[array2D, Sequence[Tuple[float, float]]]] = None, 83 84 variable_type_mixed=None, 85 86 function_timeout: Optional[float] = None, 87 algorithm_parameters: Union[AlgorithmParams, Dict[str, Any]] = default_params 88 ): 89 """ 90 initializes the GA object and performs main checks 91 92 Args: 93 function: the given objective function to be minimized -- deprecated and moved to run() method 94 dimension: the number of decision variables, the population samples dimension 95 96 variable_type: string means the variable type for all variables, 97 for mixed types use sequence of strings of type for each variable 98 99 variable_boundaries: leave it None if variable_type is 'bool'; 100 otherwise provide a sequence of tuples of length two as boundaries for each variable; 101 the length of the array must be equal dimension. 102 For example, ([0,100], [0,200]) determines 103 lower boundary 0 and upper boundary 100 for first 104 and upper boundary 200 for second variable 105 and dimension must be 2. 106 107 variable_type_mixed -- deprecated 108 109 function_timeout: if the given function does not provide 110 output before function_timeout (unit is seconds) the algorithm raises error. 111 For example, when there is an infinite loop in the given function. 112 `None` means disabling -- deprecated and moved to run() 113 114 algorithm_parameters: AlgorithmParams object or usual dictionary with algorithm parameter; 115 it is not mandatory to provide all possible parameters 116 117 Notes: 118 - This implementation minimizes the given objective function. 119 For maximization u can multiply the function by -1 (for instance): the absolute 120 value of the output would be the actual objective function 121 122 for more details and examples of implementation please visit: 123 https://github.com/PasaOpasen/geneticalgorithm2 124 125 """ 126 127 # all default fields 128 129 # self.crossover: Callable[[np.ndarray, np.ndarray], Tuple[np.ndarray, np.ndarray]] = None 130 # self.real_mutation: Callable[[float, float, float], float] = None 131 # self.discrete_mutation: Callable[[int, int, int], int] = None 132 # self.selection: Callable[[np.ndarray, int], np.ndarray] = None 133 134 self.result = None 135 self.best_function = None 136 137 self.revolution_oppositor = None 138 self.dup_oppositor = None 139 self.creator = None 140 self.init_oppositors = None 141 142 self.f: Callable[[array1D], float] = None 143 self.funtimeout: float = None 144 145 self.set_function: Callable[[np.ndarray], np.ndarray] = None 146 147 # self.dim: int = None 148 self.var_bounds: List[Tuple[Union[int, float], Union[int, float]]] = None 149 # self.indexes_int: np.ndarray = None 150 # self.indexes_float: np.ndarray = None 151 152 self.checked_reports: List[Tuple[str, Callable[[array1D], None]]] = None 153 154 self.population_size: int = None 155 self.progress_stream = None 156 157 # input algorithm's parameters 158 159 assert isinstance(algorithm_parameters, (dict, AlgorithmParams)), ( 160 "algorithm_parameters must be dict or AlgorithmParams object" 161 ) 162 if not isinstance(algorithm_parameters, AlgorithmParams): 163 algorithm_parameters = AlgorithmParams.from_dict(algorithm_parameters) 164 algorithm_parameters.validate() 165 self.param = algorithm_parameters 166 167 self.crossover, self.real_mutation, self.discrete_mutation, self.selection = algorithm_parameters.get_CMS_funcs() 168 169 # dimension and area bounds 170 self.dim = int(dimension) 171 assert self.dim > 0, f"dimension count must be int and >0, gotten {dimension}" 172 173 if variable_type_mixed is not None: 174 warnings.warn( 175 f"argument variable_type_mixed is deprecated and will be removed at version 7\n " 176 f"use variable_type={tuple(variable_type_mixed)} instead" 177 ) 178 variable_type = variable_type_mixed 179 self._set_types_indexes(variable_type) # types indexes 180 self._set_var_boundaries(variable_type, variable_boundaries) # input variables' boundaries 181 182 # fix mutation probs 183 184 assert can_be_prob(self.param.mutation_probability) 185 self.prob_mut = self.param.mutation_probability 186 assert self.param.mutation_discrete_probability is None or can_be_prob(self.param.mutation_discrete_probability) 187 self.prob_mut_discrete = self.param.mutation_discrete_probability or self.prob_mut 188 189 if self.param.crossover_probability is not None: 190 warnings.warn( 191 f"crossover_probability is deprecated and will be removed in version 7. " 192 f"Reason: it's old and has no sense" 193 ) 194 195 ############################################################# 196 # input function 197 if function: 198 warnings.warn( 199 f"function is deprecated in init constructor and will be removed in version 7. " 200 f"Move this argument to run() method" 201 ) 202 self._check_function(function) 203 if function_timeout: 204 warnings.warn( 205 f"function_timeout is deprecated in init constructor and will be removed in version 7. " 206 f"Move this argument to run() method" 207 ) 208 self._check_function_timeout(function_timeout) 209 210 ############################################################# 211 212 self.population_size = int(self.param.population_size) 213 self._set_parents_count(self.param.parents_portion) 214 self._set_elit_count(self.population_size, self.param.elit_ratio) 215 assert self.parents_count >= self.elit_count, ( 216 f"\n number of parents ({self.parents_count}) " 217 f"must be greater than number of elits ({self.elit_count})" 218 ) 219 220 self._set_max_iterations() 221 222 self._set_report() 223 224 # specify this function to speed up or change default behaviour 225 self.fill_children: Optional[Callable[[array2D, int], None]] = None 226 """ 227 custom function which adds children for population POP 228 where POP[:parents_count] are parents lines and next lines are for children 229 """
initializes the GA object and performs main checks
Arguments:
- function: the given objective function to be minimized -- deprecated and moved to run() method
- dimension: the number of decision variables, the population samples dimension
- variable_type: string means the variable type for all variables, for mixed types use sequence of strings of type for each variable
- variable_boundaries: leave it None if variable_type is 'bool'; otherwise provide a sequence of tuples of length two as boundaries for each variable; the length of the array must be equal dimension. For example, ([0,100], [0,200]) determines lower boundary 0 and upper boundary 100 for first and upper boundary 200 for second variable and dimension must be 2.
- variable_type_mixed -- deprecated
- function_timeout: if the given function does not provide
output before function_timeout (unit is seconds) the algorithm raises error.
For example, when there is an infinite loop in the given function.
None
means disabling -- deprecated and moved to run() - algorithm_parameters: AlgorithmParams object or usual dictionary with algorithm parameter; it is not mandatory to provide all possible parameters
Notes:
- This implementation minimizes the given objective function. For maximization u can multiply the function by -1 (for instance): the absolute value of the output would be the actual objective function
for more details and examples of implementation please visit: https://github.com/PasaOpasen/geneticalgorithm2
69 @property 70 def needs_mutation(self) -> bool: 71 """whether the mutation is required""" 72 return self.prob_mut > 0 or self.prob_mut_discrete > 0
whether the mutation is required
custom function which adds children for population POP where POP[:parents_count] are parents lines and next lines are for children
445 def run( 446 self, 447 no_plot: bool = False, 448 disable_printing: bool = False, 449 progress_bar_stream: Optional[str] = 'stdout', 450 451 # deprecated 452 disable_progress_bar: bool = False, 453 454 function: FunctionToMinimize = None, 455 function_timeout: Optional[float] = None, 456 457 set_function: SetFunctionToMinimize = None, 458 apply_function_to_parents: bool = False, 459 start_generation: GenerationConvertible = Generation(), 460 studEA: bool = False, 461 mutation_indexes: Optional[Iterable[int]] = None, 462 463 init_creator: Optional[CreatorFunc] = None, 464 init_oppositors: Optional[Sequence[OppositorFunc]] = None, 465 466 duplicates_oppositor: Optional[OppositorFunc] = None, 467 remove_duplicates_generation_step: Optional[int] = None, 468 469 revolution_oppositor: Optional[OppositorFunc] = None, 470 revolution_after_stagnation_step: Optional[int] = None, 471 revolution_part: float = 0.3, 472 473 population_initializer: Tuple[ 474 int, PopulationModifier 475 ] = get_population_initializer(select_best_of=1, local_optimization_step='never', local_optimizer=None), 476 477 stop_when_reached: Optional[float] = None, 478 callbacks: Optional[Sequence[SimpleCallbackFunc]] = None, 479 middle_callbacks: Optional[Sequence[MiddleCallbackFunc]] = None, #+ 480 time_limit_secs: Optional[float] = None, 481 save_last_generation_as: Optional[str] = None, 482 seed: Optional[int] = None 483 ) -> GAResult: 484 """ 485 runs optimization process 486 487 Args: 488 no_plot: do not plot results using matplotlib by default 489 490 disable_printing: do not print log info of optimization process (except progress bar) 491 492 progress_bar_stream: 'stdout', 'stderr' or None to disable progress bar; 493 disabling progress bar can speed up the optimization process in many cases 494 495 disable_progress_bar: deprecated 496 497 function: the given objective function (sample -> its score) to be minimized; 498 499 function_timeout: if the given function does not provide 500 output before function_timeout (unit is seconds) the algorithm raises error. 501 For example, when there is an infinite loop in the given function. 502 `None` means disabling 503 504 set_function: set function (all samples -> score per sample) to be used instead of usual function 505 (usually for optimization purposes) 506 507 apply_function_to_parents: whether to apply function to parents from previous generation (if it's needed), 508 it can be needed at working with games agents, but for other tasks will just waste time 509 510 start_generation: initial generation object of any `GenerationConvertible` type 511 512 studEA: using stud EA strategy (crossover with best object always) 513 514 mutation_indexes: indexes of dimensions where mutation can be performed (all dimensions by default) 515 516 init_creator: the function creates population samples. 517 By default -- random uniform for real variables and random uniform for int 518 init_oppositors: the list of oppositors creates oppositions for base population. No by default 519 520 duplicates_oppositor: oppositor for applying after duplicates removing. 521 By default -- using just random initializer from creator 522 remove_duplicates_generation_step: step for removing duplicates (have a sense with discrete tasks). 523 No by default 524 525 revolution_oppositor: oppositor for revolution time. No by default 526 revolution_after_stagnation_step: create revolution after this generations of stagnation. No by default 527 revolution_part: float, the part of generation to being oppose. By default is 0.3 528 529 population_initializer: object for actions at population initialization step 530 to create better start population. See doc 531 532 stop_when_reached: stop searching after reaching this value 533 (it can be potential minimum or something else) 534 535 callbacks: sequence of callback functions with structure: 536 (generation_number, report_list, last_population, last_scores) -> do some action 537 538 middle_callbacks: sequence of functions made by `MiddleCallback` class 539 540 time_limit_secs: limit time of working (in seconds); 541 `None` means no time limit (limit will be only for count of generation and so on) 542 543 save_last_generation_as: path to .npz file 544 for saving last_generation as numpy dictionary like 545 {'population': 2D-array, 'scores': 1D-array}; 546 None disables this option 547 548 seed: random seed (None if doesn't matter) 549 550 Returns: 551 `GAResult` object; 552 also fills the self.report and self.result with many report information 553 554 Notes: 555 - if `function_timeout` is enabled then `function` must be set 556 - it would be more logical to use params like `studEA` as an algorithm parameter, 557 but `run()`-way can be more convenient for real using 558 """ 559 560 if disable_progress_bar: 561 warnings.warn( 562 f"disable_progress_bar is deprecated and will be removed in version 7, " 563 f"use probress_bar_stream=None to disable progress bar" 564 ) 565 progress_bar_stream = None 566 567 enable_printing: bool = not disable_printing 568 569 start_generation = Generation.from_object(self.dim, start_generation) 570 571 assert is_current_gen_number(revolution_after_stagnation_step), "must be None or int and >0" 572 assert is_current_gen_number(remove_duplicates_generation_step), "must be None or int and >0" 573 assert can_be_prob(revolution_part), f"revolution_part must be in [0,1], not {revolution_part}" 574 assert stop_when_reached is None or isinstance(stop_when_reached, (int, float)) 575 assert isinstance(callbacks, collections.abc.Sequence) or callbacks is None, ( 576 "callbacks should be a list of callbacks functions" 577 ) 578 assert isinstance(middle_callbacks, collections.abc.Sequence) or middle_callbacks is None, ( 579 "middle_callbacks should be list of MiddleCallbacks functions" 580 ) 581 assert time_limit_secs is None or time_limit_secs > 0, 'time_limit_secs must be None of number > 0' 582 583 self._set_mutation_indexes(mutation_indexes) 584 from OppOpPopInit import set_seed 585 set_seed(seed) 586 587 # randomstate = np.random.default_rng(random.randint(0, 1000) if seed is None else seed) 588 # self.randomstate = randomstate 589 590 # using bool flag is faster than using empty function with generated string arguments 591 SHOW_PROGRESS = progress_bar_stream is not None 592 if SHOW_PROGRESS: 593 594 show_progress = lambda t, t2, s: self._progress(t, t2, status=s) 595 596 if progress_bar_stream == 'stdout': 597 self.progress_stream = sys.stdout 598 elif progress_bar_stream == 'stderr': 599 self.progress_stream = sys.stderr 600 else: 601 raise Exception( 602 f"wrong value {progress_bar_stream} of progress_bar_stream, must be 'stdout'/'stderr'/None" 603 ) 604 else: 605 show_progress = None 606 607 stop_by_val = ( 608 (lambda best_f: False) 609 if stop_when_reached is None 610 else (lambda best_f: best_f <= stop_when_reached) 611 ) 612 613 t: int = 0 614 count_stagnation: int = 0 615 pop: array2D = None 616 scores: array1D = None 617 618 #region CALLBACKS 619 620 def get_data(): 621 """ 622 returns all important data about model 623 """ 624 return MiddleCallbackData( 625 last_generation=Generation(pop, scores), 626 current_generation=t, 627 report_list=self.report, 628 629 mutation_prob=self.prob_mut, 630 mutation_discrete_prob=self.prob_mut_discrete, 631 mutation=self.real_mutation, 632 mutation_discrete=self.discrete_mutation, 633 crossover=self.crossover, 634 selection=self.selection, 635 636 current_stagnation=count_stagnation, 637 max_stagnation=self.max_stagnations, 638 639 parents_portion=self.param.parents_portion, 640 elit_ratio=self.param.elit_ratio, 641 642 set_function=self.set_function, 643 644 reason_to_stop=reason_to_stop 645 ) 646 647 def set_data(data: MiddleCallbackData): 648 """ 649 sets data to model 650 """ 651 nonlocal pop, scores, count_stagnation, reason_to_stop 652 653 pop, scores = data.last_generation.variables, data.last_generation.scores 654 self.population_size = pop.shape[0] 655 656 self.param.parents_portion = data.parents_portion 657 self._set_parents_count(data.parents_portion) 658 659 self.param.elit_ratio = data.elit_ratio 660 self._set_elit_count(self.population_size, data.elit_ratio) 661 662 self.prob_mut = data.mutation_prob 663 self.prob_mut_discrete = data.mutation_discrete_prob 664 self.real_mutation = data.mutation 665 self.discrete_mutation = data.mutation_discrete 666 self.crossover = data.crossover 667 self.selection = data.selection 668 669 count_stagnation = data.current_stagnation 670 reason_to_stop = data.reason_to_stop 671 self.max_stagnations = data.max_stagnation 672 673 self.set_function = data.set_function 674 675 if not callbacks: 676 total_callback = lambda g, r, lp, ls: None 677 else: 678 def total_callback( 679 generation_number: int, 680 report_list: List[float], 681 last_population: array2D, 682 last_scores: array1D 683 ): 684 for cb in callbacks: 685 cb(generation_number, report_list, last_population, last_scores) 686 687 if not middle_callbacks: 688 total_middle_callback = lambda: None 689 else: 690 def total_middle_callback(): 691 """ 692 applies callbacks and sets new data if there is a sense 693 """ 694 data = get_data() 695 flag = False 696 for cb in middle_callbacks: 697 data, has_sense = cb(data) 698 if has_sense: 699 flag = True 700 if flag: 701 set_data(data) # update global date if there was real callback step 702 703 #endregion 704 705 start_time = time.time() 706 time_is_done = ( 707 (lambda: False) 708 if time_limit_secs is None 709 else (lambda: int(time.time() - start_time) >= time_limit_secs) 710 ) 711 712 # combine with deprecated parts 713 function = function or self.f 714 function_timeout = function_timeout or self.funtimeout 715 716 assert function or set_function, 'no function to minimize' 717 if function: 718 self._check_function(function) 719 if function_timeout: 720 self._check_function_timeout(function_timeout) 721 722 self.set_function = set_function or GeneticAlgorithm2.default_set_function(self.f) 723 724 #region Initial population, duplicates filter, revolutionary 725 726 pop_coef, initializer_func = population_initializer 727 728 # population creator by random or with oppositions 729 if init_creator is None: 730 731 from OppOpPopInit import SampleInitializers 732 733 # just uniform random 734 self.creator = SampleInitializers.Combined( 735 minimums=[v[0] for v in self.var_bounds], 736 maximums=[v[1] for v in self.var_bounds], 737 indexes=( 738 self.indexes_int, 739 self.indexes_float 740 ), 741 creator_initializers=( 742 SampleInitializers.RandomInteger, 743 SampleInitializers.Uniform 744 ) 745 ) 746 else: 747 assert callable(init_creator) 748 self.creator = init_creator 749 750 self.init_oppositors = init_oppositors 751 self.dup_oppositor = duplicates_oppositor 752 self.revolution_oppositor = revolution_oppositor 753 754 # event for removing duplicates 755 if remove_duplicates_generation_step is None: 756 def remover(pop: array2D, scores: array1D, gen: int) -> Tuple[ 757 array2D, 758 array1D 759 ]: 760 return pop, scores 761 else: 762 763 def without_dup(pop: array2D, scores: array1D): # returns population without dups 764 _, index_of_dups = np.unique(pop, axis=0, return_index=True) 765 return pop[index_of_dups], scores[index_of_dups], scores.size - index_of_dups.size 766 767 if self.dup_oppositor is None: # if there is no dup_oppositor, use random creator 768 def remover(pop: array2D, scores: array1D, gen: int) -> Tuple[ 769 array2D, 770 array1D 771 ]: 772 if gen % remove_duplicates_generation_step != 0: 773 return pop, scores 774 775 pp, sc, count_to_create = without_dup(pop, scores) # pop without dups 776 777 if count_to_create == 0: 778 if SHOW_PROGRESS: 779 show_progress(t, self.max_iterations, 780 f"GA is running...{t} gen from {self.max_iterations}. No dups!") 781 return pop, scores 782 783 pp2 = SampleInitializers.CreateSamples(self.creator, count_to_create) # new pop elements 784 pp2_scores = self.set_function(pp2) # new elements values 785 786 if SHOW_PROGRESS: 787 show_progress(t, self.max_iterations, 788 f"GA is running...{t} gen from {self.max_iterations}. Kill dups!") 789 790 new_pop = np.vstack((pp, pp2)) 791 new_scores = np.concatenate((sc, pp2_scores)) 792 793 args_to_sort = new_scores.argsort() 794 return new_pop[args_to_sort], new_scores[args_to_sort] 795 796 else: # using oppositors 797 assert callable(self.dup_oppositor) 798 799 def remover(pop: np.ndarray, scores: np.ndarray, gen: int) -> Tuple[ 800 np.ndarray, 801 np.ndarray 802 ]: 803 if gen % remove_duplicates_generation_step != 0: 804 return pop, scores 805 806 pp, sc, count_to_create = without_dup(pop, scores) # pop without dups 807 808 if count_to_create == 0: 809 if SHOW_PROGRESS: 810 show_progress(t, self.max_iterations, 811 f"GA is running...{t} gen from {self.max_iterations}. No dups!") 812 return pop, scores 813 814 if count_to_create > sc.size: 815 raise Exception( 816 f"Too many duplicates at generation {gen} ({count_to_create} > {sc.size}), cannot oppose" 817 ) 818 819 # oppose count_to_create worse elements 820 pp2 = OppositionOperators.Reflect(pp[-count_to_create:], self.dup_oppositor) # new pop elements 821 pp2_scores = self.set_function(pp2) # new elements values 822 823 if SHOW_PROGRESS: 824 show_progress(t, self.max_iterations, 825 f"GA is running...{t} gen from {self.max_iterations}. Kill dups!") 826 827 new_pop = np.vstack((pp, pp2)) 828 new_scores = np.concatenate((sc, pp2_scores)) 829 830 args_to_sort = new_scores.argsort() 831 return new_pop[args_to_sort], new_scores[args_to_sort] 832 833 # event for revolution 834 if revolution_after_stagnation_step is None: 835 def revolution(pop: array2D, scores: array1D, stagnation_count: int) -> Tuple[ 836 array2D, 837 array1D 838 ]: 839 return pop, scores 840 else: 841 if revolution_oppositor is None: 842 raise Exception( 843 f"How can I make revolution each {revolution_after_stagnation_step} stagnation steps " 844 f"if revolution_oppositor is None (not defined)?" 845 ) 846 assert callable(revolution_oppositor) 847 848 from OppOpPopInit import OppositionOperators 849 850 def revolution(pop: array2D, scores: array1D, stagnation_count: int) -> Tuple[ 851 array2D, 852 array1D 853 ]: 854 if stagnation_count < revolution_after_stagnation_step: 855 return pop, scores 856 part = int(pop.shape[0]*revolution_part) 857 858 pp2 = OppositionOperators.Reflect(pop[-part:], self.revolution_oppositor) 859 pp2_scores = self.set_function(pp2) 860 861 combined = np.vstack((pop, pp2)) 862 combined_scores = np.concatenate((scores, pp2_scores)) 863 args = combined_scores.argsort() 864 865 if SHOW_PROGRESS: 866 show_progress(t, self.max_iterations, 867 f"GA is running...{t} gen from {self.max_iterations}. Revolution!") 868 869 args = args[:scores.size] 870 return combined[args], combined_scores[args] 871 872 #enregion 873 874 # 875 # 876 # START ALGORITHM LOGIC 877 # 878 # 879 880 # Report 881 self._clear_report() # clear old report objects 882 self._init_report() 883 884 # initialization of pop 885 886 if start_generation.variables is None: 887 888 from OppOpPopInit import init_population 889 890 real_pop_size = self.population_size * pop_coef 891 892 # pop = np.empty((real_pop_size, self.dim)) 893 scores = np.empty(real_pop_size) 894 895 pop = init_population( 896 samples_count=real_pop_size, 897 creator=self.creator, 898 oppositors=self.init_oppositors 899 ) 900 901 if self.funtimeout and self.funtimeout > 0: # perform simulation 902 903 time_counter = 0 904 905 for p in range(0, real_pop_size): 906 # simulation returns exception or func value -- check the time of evaluating 907 value, eval_time = self._simulate(pop[p]) 908 scores[p] = value 909 time_counter += eval_time 910 911 if enable_printing: 912 print( 913 f"\nSim: Average time of function evaluating (secs): " 914 f"{time_counter/real_pop_size} (total = {time_counter})\n" 915 ) 916 else: 917 918 eval_time = time.time() 919 scores = self.set_function(pop) 920 eval_time = time.time() - eval_time 921 if enable_printing: 922 print( 923 f"\nSet: Average time of function evaluating (secs): " 924 f"{eval_time/real_pop_size} (total = {eval_time})\n" 925 ) 926 927 else: 928 929 self.population_size = start_generation.variables.shape[0] 930 self._set_elit_count(self.population_size, self.param.elit_ratio) 931 self._set_parents_count(self.param.parents_portion) 932 933 pop = start_generation.variables 934 935 if start_generation.scores is None: 936 937 _time = time.time() 938 scores = self.set_function(pop) 939 _time = time.time() - _time 940 941 if enable_printing: 942 print( 943 f'\nFirst scores are made from gotten variables ' 944 f'(by {_time} secs, about {_time/scores.size} for each creature)\n' 945 ) 946 else: 947 scores = start_generation.scores 948 if enable_printing: 949 print(f"\nFirst scores are from gotten population\n") 950 951 # Initialization by select bests and local_descent 952 953 pop, scores = initializer_func(pop, scores) 954 955 # first sort 956 args_to_sort = scores.argsort() 957 pop = pop[args_to_sort] 958 scores = scores[args_to_sort] 959 self._update_report(scores) 960 961 self.population_size = scores.size 962 self.best_function = scores[0] 963 964 if enable_printing: 965 print( 966 f"Best score before optimization: {self.best_function}" 967 ) 968 969 t: int = 1 970 count_stagnation: int = 0 971 """iterations without progress""" 972 reason_to_stop: Optional[str] = None 973 974 # gets indexes of 2 parents to crossover 975 if studEA: 976 get_parents_inds = lambda: (0, random.randrange(1, self.parents_count)) 977 else: 978 get_parents_inds = lambda: random_indexes_pair(self.parents_count) 979 980 # while no reason to stop 981 while True: 982 983 if count_stagnation > self.max_stagnations: 984 reason_to_stop = f"limit of fails: {count_stagnation}" 985 elif t == self.max_iterations: 986 reason_to_stop = f'limit of iterations: {t}' 987 elif stop_by_val(self.best_function): 988 reason_to_stop = f"stop value reached: {self.best_function} <= {stop_when_reached}" 989 elif time_is_done(): 990 reason_to_stop = f'time is done: {time.time() - start_time} >= {time_limit_secs}' 991 992 if reason_to_stop is not None: 993 if SHOW_PROGRESS: 994 show_progress(t, self.max_iterations, f"GA is running... STOP! {reason_to_stop}") 995 break 996 997 if scores[0] < self.best_function: # if there is progress 998 count_stagnation = 0 999 self.best_function = scores[0] 1000 else: 1001 count_stagnation += 1 1002 1003 if SHOW_PROGRESS: 1004 show_progress( 1005 t, 1006 self.max_iterations, 1007 f"GA is running...{t} gen from {self.max_iterations}...best value = {self.best_function}" 1008 ) 1009 1010 # Select parents 1011 1012 par: array2D = np.empty((self.parents_count, self.dim)) 1013 """samples chosen to create new samples""" 1014 par_scores: array1D = np.empty(self.parents_count) 1015 1016 elit_slice = slice(None, self.elit_count) 1017 """elit parents""" 1018 1019 # copy needs because the generation will be removed after parents selection 1020 par[elit_slice] = pop[elit_slice].copy() 1021 par_scores[elit_slice] = scores[elit_slice].copy() 1022 1023 new_par_inds = ( 1024 self.selection( 1025 scores[self.elit_count:], 1026 self.parents_count - self.elit_count 1027 ).astype(np.int16) + self.elit_count 1028 ) 1029 """non-elit parents indexes""" 1030 #new_par_inds = self.selection(scores, self.parents_count - self.elit_count).astype(np.int16) 1031 par_slice = slice(self.elit_count, self.parents_count) 1032 par[par_slice] = pop[new_par_inds].copy() 1033 par_scores[par_slice] = scores[new_par_inds].copy() 1034 1035 pop = np.empty((self.population_size, self.dim)) 1036 """new generation""" 1037 scores = np.empty(self.population_size) 1038 """new generation scores""" 1039 1040 parents_slice = slice(None, self.parents_count) 1041 pop[parents_slice] = par 1042 scores[parents_slice] = par_scores 1043 1044 if self.fill_children is None: # default fill children behaviour 1045 DO_MUTATION = self.needs_mutation 1046 for k in range(self.parents_count, self.population_size, 2): 1047 1048 r1, r2 = get_parents_inds() 1049 1050 pvar1 = pop[r1] # equal to par[r1], but better for cache 1051 pvar2 = pop[r2] 1052 1053 ch1, ch2 = self.crossover(pvar1, pvar2) 1054 1055 if DO_MUTATION: 1056 ch1 = self.mut(ch1) 1057 ch2 = self.mut_middle(ch2, pvar1, pvar2) 1058 1059 pop[k] = ch1 1060 pop[k+1] = ch2 1061 else: # custom behaviour 1062 self.fill_children(pop, self.parents_count) 1063 1064 if apply_function_to_parents: 1065 scores = self.set_function(pop) 1066 else: 1067 scores[self.parents_count:] = self.set_function(pop[self.parents_count:]) 1068 1069 # remove duplicates 1070 pop, scores = remover(pop, scores, t) 1071 # revolution 1072 pop, scores = revolution(pop, scores, count_stagnation) 1073 1074 # make population better 1075 args_to_sort = scores.argsort() 1076 pop = pop[args_to_sort] 1077 scores = scores[args_to_sort] 1078 self._update_report(scores) 1079 1080 # callback it 1081 total_callback(t, self.report, pop, scores) 1082 total_middle_callback() 1083 1084 t += 1 1085 1086 self.best_function = fast_min(scores[0], self.best_function) 1087 1088 last_generation = Generation(pop, scores) 1089 self.result = GAResult(last_generation) 1090 1091 if save_last_generation_as is not None: 1092 last_generation.save(save_last_generation_as) 1093 1094 if enable_printing: 1095 show = ' ' * 200 1096 sys.stdout.write( 1097 f'\r{show}\n' 1098 f'\r The best found solution:\n {pop[0]}' 1099 f'\n\n Objective function:\n {self.best_function}\n' 1100 f'\n Used generations: {len(self.report)}' 1101 f'\n Used time: {time.time() - start_time:.3g} seconds\n' 1102 ) 1103 sys.stdout.flush() 1104 1105 if not no_plot: 1106 self.plot_results() 1107 1108 if enable_printing: 1109 if reason_to_stop is not None and 'iterations' not in reason_to_stop: 1110 sys.stdout.write( 1111 f'\nWarning: GA is terminated because of {reason_to_stop}' 1112 ) 1113 1114 return self.result
runs optimization process
Arguments:
- no_plot: do not plot results using matplotlib by default
- disable_printing: do not print log info of optimization process (except progress bar)
- progress_bar_stream: 'stdout', 'stderr' or None to disable progress bar; disabling progress bar can speed up the optimization process in many cases
- disable_progress_bar: deprecated
- function: the given objective function (sample -> its score) to be minimized;
- function_timeout: if the given function does not provide
output before function_timeout (unit is seconds) the algorithm raises error.
For example, when there is an infinite loop in the given function.
None
means disabling - set_function: set function (all samples -> score per sample) to be used instead of usual function (usually for optimization purposes)
- apply_function_to_parents: whether to apply function to parents from previous generation (if it's needed), it can be needed at working with games agents, but for other tasks will just waste time
- start_generation: initial generation object of any
GenerationConvertible
type - studEA: using stud EA strategy (crossover with best object always)
- mutation_indexes: indexes of dimensions where mutation can be performed (all dimensions by default)
- init_creator: the function creates population samples. By default -- random uniform for real variables and random uniform for int
- init_oppositors: the list of oppositors creates oppositions for base population. No by default
- duplicates_oppositor: oppositor for applying after duplicates removing. By default -- using just random initializer from creator
- remove_duplicates_generation_step: step for removing duplicates (have a sense with discrete tasks). No by default
- revolution_oppositor: oppositor for revolution time. No by default
- revolution_after_stagnation_step: create revolution after this generations of stagnation. No by default
- revolution_part: float, the part of generation to being oppose. By default is 0.3
- population_initializer: object for actions at population initialization step to create better start population. See doc
- stop_when_reached: stop searching after reaching this value (it can be potential minimum or something else)
- callbacks: sequence of callback functions with structure: (generation_number, report_list, last_population, last_scores) -> do some action
- middle_callbacks: sequence of functions made by
MiddleCallback
class - time_limit_secs: limit time of working (in seconds);
None
means no time limit (limit will be only for count of generation and so on) - save_last_generation_as: path to .npz file for saving last_generation as numpy dictionary like {'population': 2D-array, 'scores': 1D-array}; None disables this option
- seed: random seed (None if doesn't matter)
Returns:
GAResult
object; also fills the self.report and self.result with many report information
Notes:
- if
function_timeout
is enabled thenfunction
must be set- it would be more logical to use params like
studEA
as an algorithm parameter, butrun()
-way can be more convenient for real using
1120 def plot_results( 1121 self, 1122 title: str = 'Genetic Algorithm', 1123 save_as: Optional[str] = None, 1124 dpi: int = 200, 1125 main_color: str = 'blue' 1126 ): 1127 """ 1128 Simple plot of self.report (if not empty) 1129 """ 1130 if len(self.report) == 0: 1131 sys.stdout.write("No results to plot!\n") 1132 return 1133 1134 plot_several_lines( 1135 lines=[self.report], 1136 colors=[main_color], 1137 labels=['best of generation'], 1138 linewidths=[2], 1139 title=title, 1140 xlabel='Generation', 1141 ylabel='Minimized function', 1142 save_as=save_as, 1143 dpi=dpi 1144 )
Simple plot of self.report (if not empty)
1146 def plot_generation_scores(self, title: str = 'Last generation scores', save_as: Optional[str] = None): 1147 """ 1148 Plots barplot of scores of last population 1149 """ 1150 1151 if not hasattr(self, 'result'): 1152 raise Exception( 1153 "There is no 'result' field into ga object! Before plotting generation u need to run seaching process" 1154 ) 1155 1156 plot_pop_scores(self.result.last_generation.scores, title, save_as)
Plots barplot of scores of last population
1161 def mut(self, x: array1D): 1162 """ 1163 just mutation 1164 """ 1165 1166 for i in self.indexes_int_mut: 1167 if random.random() < self.prob_mut_discrete: 1168 x[i] = self.discrete_mutation(x[i], *self.var_bounds[i]) 1169 1170 for i in self.indexes_float_mut: 1171 if random.random() < self.prob_mut: 1172 x[i] = self.real_mutation(x[i], *self.var_bounds[i]) 1173 1174 return x
just mutation
1176 def mut_middle(self, x: array1D, p1: array1D, p2: array1D): 1177 """ 1178 mutation oriented on parents 1179 """ 1180 for i in self.indexes_int_mut: 1181 1182 if random.random() < self.prob_mut_discrete: 1183 v1, v2 = p1[i], p2[i] 1184 if v1 < v2: 1185 x[i] = random.randint(v1, v2) 1186 elif v1 > v2: 1187 x[i] = random.randint(v2, v1) 1188 else: 1189 x[i] = random.randint(*self.var_bounds[i]) 1190 1191 for i in self.indexes_float_mut: 1192 if random.random() < self.prob_mut: 1193 v1, v2 = p1[i], p2[i] 1194 if v1 != v2: 1195 x[i] = random.uniform(v1, v2) 1196 else: 1197 x[i] = random.uniform(*self.var_bounds[i]) 1198 return x
mutation oriented on parents
1204 @staticmethod 1205 def default_set_function(function_for_set: Callable[[array1D], float]): 1206 """ 1207 simple function for creating set_function 1208 function_for_set just applies to each row of population 1209 """ 1210 def func(matrix: array2D): 1211 return np.array( 1212 [function_for_set(row) for row in matrix] 1213 ) 1214 return func
simple function for creating set_function function_for_set just applies to each row of population
1216 @staticmethod 1217 def vectorized_set_function(function_for_set: Callable[[array1D], float]): 1218 """ 1219 works like default, but faster for big populations and slower for little 1220 function_for_set just applyes to each row of population 1221 """ 1222 1223 func = np.vectorize(function_for_set, signature='(n)->()') 1224 return func
works like default, but faster for big populations and slower for little function_for_set just applyes to each row of population
1226 @staticmethod 1227 def set_function_multiprocess(function_for_set: Callable[[array1D], float], n_jobs: int = -1): 1228 """ 1229 like function_for_set but uses joblib with n_jobs (-1 goes to count of available processors) 1230 """ 1231 try: 1232 from joblib import Parallel, delayed 1233 except ModuleNotFoundError: 1234 raise ModuleNotFoundError( 1235 "this additional feature requires joblib package," 1236 "run `pip install joblib` or `pip install --upgrade geneticalgorithm2[full]`" 1237 "or use another set function" 1238 ) 1239 1240 def func(matrix: array2D): 1241 result = Parallel(n_jobs=n_jobs)(delayed(function_for_set)(matrix[i]) for i in range(matrix.shape[0])) 1242 return np.array(result) 1243 return func
like function_for_set but uses joblib with n_jobs (-1 goes to count of available processors)