Edit on GitHub

geneticalgorithm2.geneticalgorithm2

   1from typing import Callable, List, Tuple, Optional, Dict, Any, Union, Sequence, Literal, Iterable
   2from typing_extensions import TypeAlias
   3
   4import collections
   5import warnings
   6import operator
   7
   8import sys
   9import time
  10import random
  11import math
  12
  13import numpy as np
  14
  15from OppOpPopInit.initialiser import CreatorFunc
  16from OppOpPopInit.oppositor import OppositorFunc
  17
  18#region INTERNAL IMPORTS
  19
  20from .utils.aliases import array1D, array2D
  21
  22from .data_types.aliases import FunctionToMinimize, SetFunctionToMinimize
  23from .data_types.algorithm_params import AlgorithmParams
  24from .data_types.generation import GenerationConvertible, Generation
  25from .data_types.result import GAResult
  26
  27from .population_initializer import get_population_initializer, PopulationModifier
  28from .utils.plotting import plot_pop_scores, plot_several_lines
  29
  30from .utils.funcs import can_be_prob, is_numpy, is_current_gen_number, fast_min, random_indexes_pair
  31
  32from .callbacks.data import MiddleCallbackData
  33from .callbacks import MiddleCallbackFunc, SimpleCallbackFunc
  34
  35#endregion
  36
  37#region ALIASES
  38
  39VARIABLE_TYPE: TypeAlias = Literal['int', 'real', 'bool']
  40"""
  41the variable type for a given or all dimension, determines the values discretion:
  42
  43    real: double numbers
  44
  45    int: integer number only
  46    
  47    bool: in the fact is integer with bounds [0, 1]
  48"""
  49
  50#endregion
  51
  52
  53class GeneticAlgorithm2:
  54    """
  55    Genetic algorithm optimization process
  56    """
  57    
  58    default_params = AlgorithmParams()
  59    PROGRESS_BAR_LEN = 20
  60    """max count of symbols in the progress bar"""
  61
  62    @property
  63    def output_dict(self):
  64        warnings.warn(
  65            "'output_dict' is deprecated and will be removed at version 7 \n"
  66            "use 'result' instead"
  67        )
  68        return self.result
  69
  70    @property
  71    def needs_mutation(self) -> bool:
  72        """whether the mutation is required"""
  73        return self.prob_mut > 0 or self.prob_mut_discrete > 0
  74
  75    #region INIT
  76
  77    def __init__(
  78        self,
  79        function: FunctionToMinimize = None,
  80
  81        dimension: int = 0,
  82        variable_type: Union[VARIABLE_TYPE, Sequence[VARIABLE_TYPE]] = 'bool',
  83        variable_boundaries: Optional[Union[array2D, Sequence[Tuple[float, float]]]] = None,
  84
  85        variable_type_mixed=None,
  86
  87        function_timeout: Optional[float] = None,
  88        algorithm_parameters: Union[AlgorithmParams, Dict[str, Any]] = default_params
  89    ):
  90        """
  91        initializes the GA object and performs main checks
  92
  93        Args:
  94            function: the given objective function to be minimized -- deprecated and moved to run() method
  95            dimension: the number of decision variables, the population samples dimension
  96
  97            variable_type: string means the variable type for all variables,
  98                for mixed types use sequence of strings of type for each variable
  99
 100            variable_boundaries: leave it None if variable_type is 'bool';
 101                otherwise provide a sequence of tuples of length two as boundaries for each variable;
 102                the length of the array must be equal dimension.
 103                For example, ([0,100], [0,200]) determines
 104                    lower boundary 0 and upper boundary 100 for first
 105                    and upper boundary 200 for second variable
 106                    and dimension must be 2.
 107
 108            variable_type_mixed -- deprecated
 109
 110            function_timeout: if the given function does not provide
 111                output before function_timeout (unit is seconds) the algorithm raises error.
 112                For example, when there is an infinite loop in the given function.
 113                `None` means disabling -- deprecated and moved to run()
 114
 115            algorithm_parameters: AlgorithmParams object or usual dictionary with algorithm parameter;
 116                it is not mandatory to provide all possible parameters
 117
 118        Notes:
 119            - This implementation minimizes the given objective function.
 120            For maximization u can multiply the function by -1 (for instance): the absolute
 121                value of the output would be the actual objective function
 122
 123        for more details and examples of implementation please visit:
 124            https://github.com/PasaOpasen/geneticalgorithm2
 125  
 126        """
 127
 128        # all default fields
 129
 130        # self.crossover: Callable[[np.ndarray, np.ndarray], Tuple[np.ndarray, np.ndarray]] = None
 131        # self.real_mutation: Callable[[float, float, float], float] = None
 132        # self.discrete_mutation: Callable[[int, int, int], int] = None
 133        # self.selection: Callable[[np.ndarray, int], np.ndarray] = None
 134
 135        self.result = None
 136        self.best_function = None
 137
 138        self.revolution_oppositor = None
 139        self.dup_oppositor = None
 140        self.creator = None
 141        self.init_oppositors = None
 142
 143        self.f: Callable[[array1D], float] = None
 144        self.funtimeout: float = None
 145
 146        self.set_function: Callable[[np.ndarray], np.ndarray] = None
 147
 148        # self.dim: int = None
 149        self.var_bounds: List[Tuple[Union[int, float], Union[int, float]]] = None
 150        # self.indexes_int: np.ndarray = None
 151        # self.indexes_float: np.ndarray = None
 152
 153        self.checked_reports: List[Tuple[str, Callable[[array1D], None]]] = None
 154
 155        self.population_size: int = None
 156        self.progress_stream = None
 157
 158        # input algorithm's parameters
 159
 160        assert isinstance(algorithm_parameters, (dict, AlgorithmParams)), (
 161            "algorithm_parameters must be dict or AlgorithmParams object"
 162        )
 163        if not isinstance(algorithm_parameters, AlgorithmParams):
 164            algorithm_parameters = AlgorithmParams.from_dict(algorithm_parameters)
 165        algorithm_parameters.validate()
 166        self.param = algorithm_parameters
 167
 168        self.crossover, self.real_mutation, self.discrete_mutation, self.selection = algorithm_parameters.get_CMS_funcs()
 169
 170        # dimension and area bounds
 171        self.dim = int(dimension)
 172        assert self.dim > 0, f"dimension count must be int and >0, gotten {dimension}"
 173
 174        if variable_type_mixed is not None:
 175            warnings.warn(
 176                f"argument variable_type_mixed is deprecated and will be removed at version 7\n "
 177                f"use variable_type={tuple(variable_type_mixed)} instead"
 178            )
 179            variable_type = variable_type_mixed
 180        self._set_types_indexes(variable_type)  # types indexes
 181        self._set_var_boundaries(variable_type, variable_boundaries)  # input variables' boundaries
 182
 183        # fix mutation probs
 184
 185        assert can_be_prob(self.param.mutation_probability)
 186        self.prob_mut = self.param.mutation_probability
 187        assert self.param.mutation_discrete_probability is None or can_be_prob(self.param.mutation_discrete_probability)
 188        self.prob_mut_discrete = self.param.mutation_discrete_probability or self.prob_mut
 189
 190        if self.param.crossover_probability is not None:
 191            warnings.warn(
 192                f"crossover_probability is deprecated and will be removed in version 7. "
 193                f"Reason: it's old and has no sense"
 194            )
 195
 196        #############################################################
 197        # input function
 198        if function:
 199            warnings.warn(
 200                f"function is deprecated in init constructor and will be removed in version 7. "
 201                f"Move this argument to run() method"
 202            )
 203            self._check_function(function)
 204        if function_timeout:
 205            warnings.warn(
 206                f"function_timeout is deprecated in init constructor and will be removed in version 7. "
 207                f"Move this argument to run() method"
 208            )
 209            self._check_function_timeout(function_timeout)
 210
 211        #############################################################
 212        
 213        self.population_size = int(self.param.population_size)
 214        self._set_parents_count(self.param.parents_portion)
 215        self._set_elit_count(self.population_size, self.param.elit_ratio)
 216        assert self.parents_count >= self.elit_count, (
 217            f"\n number of parents ({self.parents_count}) "
 218            f"must be greater than number of elits ({self.elit_count})"
 219        )
 220
 221        self._set_max_iterations()
 222
 223        self._set_report()
 224
 225        # specify this function to speed up or change default behaviour
 226        self.fill_children: Optional[Callable[[array2D, int], None]] = None
 227        """
 228        custom function which adds children for population POP 
 229            where POP[:parents_count] are parents lines and next lines are for children
 230        """
 231
 232    def _set_types_indexes(self, variable_type: Union[str, Sequence[str]]):
 233
 234        indexes = np.arange(self.dim)
 235        self.indexes_int = np.array([])
 236        self.indexes_float = np.array([])
 237
 238        assert_message = (
 239            f"\n variable_type must be 'bool', 'int', 'real' or a sequence with 'int' and 'real', got {variable_type}"
 240        )
 241
 242        if isinstance(variable_type, str):
 243            assert (variable_type in VARIABLE_TYPE.__args__), assert_message
 244            if variable_type == 'real':
 245                self.indexes_float = indexes
 246            else:
 247                self.indexes_int = indexes
 248
 249        else:  # sequence case
 250
 251            assert len(variable_type) == self.dim, (
 252                f"\n variable_type must have a length equal dimension. "
 253                f"Should be {self.dim}, got {len(variable_type)}"
 254            )
 255            assert 'bool' not in variable_type, (
 256                "don't use 'bool' if variable_type is a sequence, "
 257                "for 'boolean' case use 'int' and specify boundary as (0,1)"
 258            )
 259            assert all(v in VARIABLE_TYPE.__args__ for v in variable_type), assert_message
 260
 261            vartypes = np.array(variable_type)
 262            self.indexes_int = indexes[vartypes == 'int']
 263            self.indexes_float = indexes[vartypes == 'real']
 264
 265    def _set_var_boundaries(
 266        self,
 267        variable_type: Union[str, Sequence[str]],
 268        variable_boundaries
 269    ):
 270        if isinstance(variable_type, str) and variable_type == 'bool':
 271            self.var_bounds = [(0, 1)] * self.dim
 272        else:
 273
 274            if is_numpy(variable_boundaries):
 275                assert variable_boundaries.shape == (self.dim, 2), (
 276                    f"\n if variable_boundaries is numpy array, it must be with shape (dim, 2)"
 277                )
 278            else:
 279                assert len(variable_boundaries) == self.dim and all((len(t) == 2 for t in variable_boundaries)), (
 280                    "\n if variable_boundaries is sequence, "
 281                    "it must be with len dim and boundary for each variable must be a tuple of length two"
 282                )
 283
 284            for i in variable_boundaries:
 285                assert i[0] <= i[1], "\n lower_boundaries must be smaller than upper_boundaries [lower,upper]"
 286
 287            self.var_bounds = [(i[0], i[1]) for i in variable_boundaries]
 288
 289    def _set_parents_count(self, parents_portion: float):
 290
 291        self.parents_count = int(parents_portion * self.population_size)
 292        assert self.population_size >= self.parents_count > 1, (
 293            f'parents count {self.parents_count} cannot be less than population size {self.population_size}'
 294        )
 295        trl = self.population_size - self.parents_count
 296        if trl % 2 != 0:
 297            self.parents_count += 1
 298
 299    def _set_elit_count(self, pop_size: int, elit_ratio: Union[float, int]):
 300
 301        assert elit_ratio >= 0
 302        self.elit_count = elit_ratio if isinstance(elit_ratio, str) else math.ceil(pop_size*elit_ratio)
 303
 304    def _set_max_iterations(self):
 305
 306        if self.param.max_num_iteration is None:
 307            iterate = 0
 308            for i in range(0, self.dim):
 309                bound_min, bound_max = self.var_bounds[i]
 310                var_space = bound_max - bound_min
 311                if i in self.indexes_int:
 312                    iterate += var_space * self.dim * (100 / self.population_size)
 313                else:
 314                    iterate += var_space * 50 * (100 / self.population_size)
 315            iterate = int(iterate)
 316            if (iterate * self.population_size) > 10000000:
 317                iterate = 10000000 / self.population_size
 318
 319            self.max_iterations = fast_min(iterate, 8000)
 320        else:
 321            assert self.param.max_num_iteration > 0
 322            self.max_iterations = math.ceil(self.param.max_num_iteration)
 323
 324        max_it = self.param.max_iteration_without_improv
 325        if max_it is None:
 326            self.max_stagnations = self.max_iterations + 1
 327        else:
 328            self.max_stagnations = math.ceil(max_it)
 329
 330    def _check_function(self, function: Callable[[array1D], float]):
 331        assert callable(function), "function must be callable!"
 332        self.f = function
 333
 334    def _check_function_timeout(self, function_timeout: Optional[float]):
 335
 336        if function_timeout is not None and function_timeout > 0:
 337            try:
 338                from func_timeout import func_timeout, FunctionTimedOut
 339            except ModuleNotFoundError:
 340                raise ModuleNotFoundError(
 341                    "function_timeout > 0 needs additional package func_timeout\n"
 342                    "run `python -m pip install func_timeout`\n"
 343                    "or disable this parameter: function_timeout=None"
 344                )
 345
 346        self.funtimeout = None if function_timeout is None else float(function_timeout)
 347
 348    #endregion
 349
 350    #region REPORT
 351
 352    def _set_report(self):
 353        """
 354        creates default report checker
 355        """
 356        self.checked_reports = [
 357            # item 0 cuz scores will be sorted and min item is items[0]
 358            ('report', operator.itemgetter(0))
 359        ]
 360
 361    def _clear_report(self):
 362        """
 363        removes all report objects
 364        """
 365        fields = [f for f in vars(self).keys() if f.startswith('report')]
 366        for attr in fields:
 367            delattr(self, attr)
 368
 369    def _init_report(self):
 370        """
 371        makes empty report fields
 372        """
 373        for name, _ in self.checked_reports:
 374            setattr(self, name, [])
 375
 376    def _update_report(self, scores: array1D):
 377        """
 378        append report value to the end of field
 379        """
 380        for name, func in self.checked_reports:
 381            getattr(self, name).append(
 382                func(scores)
 383            )
 384
 385    #endregion
 386
 387    #region RUN METHODS
 388
 389    def _progress(self, count: int, total: int, status: str = ''):
 390
 391        part = count / total
 392
 393        filled_len = round(GeneticAlgorithm2.PROGRESS_BAR_LEN * part)
 394        percents = round(100.0 * part, 1)
 395        bar = '|' * filled_len + '_' * (GeneticAlgorithm2.PROGRESS_BAR_LEN - filled_len)
 396
 397        self.progress_stream.write('\r%s %s%s %s' % (bar, percents, '%', status))
 398        self.progress_stream.flush()
 399
 400    def __str__(self):
 401        return f"Genetic algorithm object with parameters {self.param}"
 402
 403    def __repr__(self):
 404        return self.__str__()
 405
 406    def _simulate(self, sample: array1D):
 407
 408        from func_timeout import func_timeout, FunctionTimedOut
 409
 410        obj = None
 411        eval_time = time.time()
 412        try:
 413            obj = func_timeout(
 414                self.funtimeout,
 415                lambda: self.f(sample)
 416            )
 417        except FunctionTimedOut:
 418            print("given function is not applicable")
 419        eval_time = time.time() - eval_time
 420
 421        assert obj is not None, (
 422            f"the given function was running like {eval_time} seconds and does not provide any output"
 423        )
 424
 425        tp = type(obj)
 426        assert (
 427            tp in (int, float) or np.issubdtype(tp, np.floating) or np.issubdtype(tp, np.integer)
 428        ), f"Minimized function should return a number, but got '{obj}' object with type {tp}"
 429
 430        return obj, eval_time
 431
 432    def _set_mutation_indexes(self, mutation_indexes: Optional[Iterable[int]]):
 433
 434        if mutation_indexes is None:
 435            self.indexes_float_mut = self.indexes_float
 436            self.indexes_int_mut = self.indexes_int
 437        else:
 438            tmp_indexes = set(mutation_indexes)
 439            self.indexes_int_mut = np.array(list(tmp_indexes.intersection(self.indexes_int)))
 440            self.indexes_float_mut = np.array(list(tmp_indexes.intersection(self.indexes_float)))
 441
 442            if self.indexes_float_mut.size == 0 and self.indexes_int_mut.size == 0:
 443                warnings.warn(f"No mutation dimensions!!! Check ur mutation indexes!!")
 444
 445    #@profile
 446    def run(
 447        self,
 448        no_plot: bool = False,
 449        disable_printing: bool = False,
 450        progress_bar_stream: Optional[str] = 'stdout',
 451
 452        # deprecated
 453        disable_progress_bar: bool = False,
 454
 455        function: FunctionToMinimize = None,
 456        function_timeout: Optional[float] = None,
 457
 458        set_function: SetFunctionToMinimize = None,
 459        apply_function_to_parents: bool = False,
 460        start_generation: GenerationConvertible = Generation(),
 461        studEA: bool = False,
 462        mutation_indexes: Optional[Iterable[int]] = None,
 463
 464        init_creator: Optional[CreatorFunc] = None,
 465        init_oppositors: Optional[Sequence[OppositorFunc]] = None,
 466
 467        duplicates_oppositor: Optional[OppositorFunc] = None,
 468        remove_duplicates_generation_step: Optional[int] = None,
 469
 470        revolution_oppositor: Optional[OppositorFunc] = None,
 471        revolution_after_stagnation_step: Optional[int] = None,
 472        revolution_part: float = 0.3,
 473
 474        population_initializer: Tuple[
 475            int, PopulationModifier
 476        ] = get_population_initializer(select_best_of=1, local_optimization_step='never', local_optimizer=None),
 477
 478        stop_when_reached: Optional[float] = None,
 479        callbacks: Optional[Sequence[SimpleCallbackFunc]] = None,
 480        middle_callbacks: Optional[Sequence[MiddleCallbackFunc]] = None,  #+
 481        time_limit_secs: Optional[float] = None,
 482        save_last_generation_as: Optional[str] = None,
 483        seed: Optional[int] = None
 484    ) -> GAResult:
 485        """
 486        runs optimization process
 487
 488        Args:
 489            no_plot: do not plot results using matplotlib by default
 490
 491            disable_printing: do not print log info of optimization process (except progress bar)
 492
 493            progress_bar_stream: 'stdout', 'stderr' or None to disable progress bar;
 494                disabling progress bar can speed up the optimization process in many cases
 495
 496            disable_progress_bar: deprecated
 497
 498            function: the given objective function (sample -> its score) to be minimized;
 499
 500            function_timeout: if the given function does not provide
 501                output before function_timeout (unit is seconds) the algorithm raises error.
 502                For example, when there is an infinite loop in the given function.
 503                `None` means disabling
 504
 505            set_function: set function (all samples -> score per sample) to be used instead of usual function
 506                (usually for optimization purposes)
 507
 508            apply_function_to_parents: whether to apply function to parents from previous generation (if it's needed), 
 509                it can be needed at working with games agents, but for other tasks will just waste time
 510
 511            start_generation: initial generation object of any `GenerationConvertible` type
 512
 513            studEA: using stud EA strategy (crossover with best object always)
 514
 515            mutation_indexes: indexes of dimensions where mutation can be performed (all dimensions by default)
 516
 517            init_creator: the function creates population samples.
 518                By default -- random uniform for real variables and random uniform for int
 519            init_oppositors: the list of oppositors creates oppositions for base population. No by default
 520
 521            duplicates_oppositor: oppositor for applying after duplicates removing.
 522                By default -- using just random initializer from creator
 523            remove_duplicates_generation_step: step for removing duplicates (have a sense with discrete tasks).
 524                No by default
 525
 526            revolution_oppositor: oppositor for revolution time. No by default
 527            revolution_after_stagnation_step: create revolution after this generations of stagnation. No by default
 528            revolution_part: float, the part of generation to being oppose. By default is 0.3
 529
 530            population_initializer: object for actions at population initialization step
 531                to create better start population. See doc
 532
 533            stop_when_reached: stop searching after reaching this value 
 534                (it can be potential minimum or something else)
 535
 536            callbacks: sequence of callback functions with structure:
 537                (generation_number, report_list, last_population, last_scores) -> do some action
 538
 539            middle_callbacks: sequence of functions made by `MiddleCallback` class
 540
 541            time_limit_secs: limit time of working (in seconds);
 542                `None` means no time limit (limit will be only for count of generation and so on)
 543
 544            save_last_generation_as: path to .npz file 
 545                for saving last_generation as numpy dictionary like
 546                {'population': 2D-array, 'scores': 1D-array}; 
 547                None disables this option
 548
 549            seed: random seed (None if doesn't matter)
 550
 551        Returns:
 552            `GAResult` object;
 553            also fills the self.report and self.result with many report information
 554
 555        Notes:
 556            - if `function_timeout` is enabled then `function` must be set
 557            - it would be more logical to use params like `studEA` as an algorithm parameter, 
 558                but `run()`-way can be more convenient for real using
 559        """
 560
 561        if disable_progress_bar:
 562            warnings.warn(
 563                f"disable_progress_bar is deprecated and will be removed in version 7, "
 564                f"use probress_bar_stream=None to disable progress bar"
 565            )
 566            progress_bar_stream = None
 567
 568        enable_printing: bool = not disable_printing
 569
 570        start_generation = Generation.from_object(self.dim, start_generation)
 571
 572        assert is_current_gen_number(revolution_after_stagnation_step), "must be None or int and >0"
 573        assert is_current_gen_number(remove_duplicates_generation_step), "must be None or int and >0"
 574        assert can_be_prob(revolution_part), f"revolution_part must be in [0,1], not {revolution_part}"
 575        assert stop_when_reached is None or isinstance(stop_when_reached, (int, float))
 576        assert isinstance(callbacks, collections.abc.Sequence) or callbacks is None, (
 577            "callbacks should be a list of callbacks functions"
 578        )
 579        assert isinstance(middle_callbacks, collections.abc.Sequence) or middle_callbacks is None, (
 580            "middle_callbacks should be list of MiddleCallbacks functions"
 581        )
 582        assert time_limit_secs is None or time_limit_secs > 0, 'time_limit_secs must be None of number > 0'
 583
 584        self._set_mutation_indexes(mutation_indexes)
 585        from OppOpPopInit import set_seed
 586        set_seed(seed)
 587
 588        # randomstate = np.random.default_rng(random.randint(0, 1000) if seed is None else seed)
 589        # self.randomstate = randomstate
 590
 591        # using bool flag is faster than using empty function with generated string arguments
 592        SHOW_PROGRESS = progress_bar_stream is not None
 593        if SHOW_PROGRESS:
 594
 595            show_progress = lambda t, t2, s: self._progress(t, t2, status=s)
 596
 597            if progress_bar_stream == 'stdout':
 598                self.progress_stream = sys.stdout
 599            elif progress_bar_stream == 'stderr':
 600                self.progress_stream = sys.stderr
 601            else:
 602                raise Exception(
 603                    f"wrong value {progress_bar_stream} of progress_bar_stream, must be 'stdout'/'stderr'/None"
 604                )
 605        else:
 606            show_progress = None
 607
 608        stop_by_val = (
 609            (lambda best_f: False)
 610            if stop_when_reached is None
 611            else (lambda best_f: best_f <= stop_when_reached)
 612        )
 613
 614        t: int = 0
 615        count_stagnation: int = 0
 616        pop: array2D = None
 617        scores: array1D = None
 618
 619        #region CALLBACKS
 620
 621        def get_data():
 622            """
 623            returns all important data about model
 624            """
 625            return MiddleCallbackData(
 626                last_generation=Generation(pop, scores),
 627                current_generation=t,
 628                report_list=self.report,
 629
 630                mutation_prob=self.prob_mut,
 631                mutation_discrete_prob=self.prob_mut_discrete,
 632                mutation=self.real_mutation,
 633                mutation_discrete=self.discrete_mutation,
 634                crossover=self.crossover,
 635                selection=self.selection,
 636
 637                current_stagnation=count_stagnation,
 638                max_stagnation=self.max_stagnations,
 639
 640                parents_portion=self.param.parents_portion,
 641                elit_ratio=self.param.elit_ratio,
 642
 643                set_function=self.set_function,
 644
 645                reason_to_stop=reason_to_stop
 646            )
 647
 648        def set_data(data: MiddleCallbackData):
 649            """
 650            sets data to model
 651            """
 652            nonlocal pop, scores, count_stagnation, reason_to_stop
 653
 654            pop, scores = data.last_generation.variables, data.last_generation.scores
 655            self.population_size = pop.shape[0]
 656
 657            self.param.parents_portion = data.parents_portion
 658            self._set_parents_count(data.parents_portion)
 659
 660            self.param.elit_ratio = data.elit_ratio
 661            self._set_elit_count(self.population_size, data.elit_ratio)
 662
 663            self.prob_mut = data.mutation_prob
 664            self.prob_mut_discrete = data.mutation_discrete_prob
 665            self.real_mutation = data.mutation
 666            self.discrete_mutation = data.mutation_discrete
 667            self.crossover = data.crossover
 668            self.selection = data.selection
 669
 670            count_stagnation = data.current_stagnation
 671            reason_to_stop = data.reason_to_stop
 672            self.max_stagnations = data.max_stagnation
 673
 674            self.set_function = data.set_function
 675
 676        if not callbacks:
 677            total_callback = lambda g, r, lp, ls: None
 678        else:
 679            def total_callback(
 680                generation_number: int,
 681                report_list: List[float],
 682                last_population: array2D,
 683                last_scores: array1D
 684            ):
 685                for cb in callbacks:
 686                    cb(generation_number, report_list, last_population, last_scores)
 687
 688        if not middle_callbacks:
 689            total_middle_callback = lambda: None
 690        else:
 691            def total_middle_callback():
 692                """
 693                applies callbacks and sets new data if there is a sense
 694                """
 695                data = get_data()
 696                flag = False
 697                for cb in middle_callbacks:
 698                    data, has_sense = cb(data)
 699                    if has_sense:
 700                        flag = True
 701                if flag:
 702                    set_data(data)  # update global date if there was real callback step
 703
 704        #endregion
 705
 706        start_time = time.time()
 707        time_is_done = (
 708            (lambda: False)
 709            if time_limit_secs is None
 710            else (lambda: int(time.time() - start_time) >= time_limit_secs)
 711        )
 712
 713        # combine with deprecated parts
 714        function = function or self.f
 715        function_timeout = function_timeout or self.funtimeout
 716
 717        assert function or set_function, 'no function to minimize'
 718        if function:
 719            self._check_function(function)
 720        if function_timeout:
 721            self._check_function_timeout(function_timeout)
 722
 723        self.set_function = set_function or GeneticAlgorithm2.default_set_function(self.f)
 724
 725        #region Initial population, duplicates filter, revolutionary
 726
 727        pop_coef, initializer_func = population_initializer
 728        
 729        # population creator by random or with oppositions
 730        if init_creator is None:
 731
 732            from OppOpPopInit import SampleInitializers
 733
 734            # just uniform random
 735            self.creator = SampleInitializers.Combined(
 736                minimums=[v[0] for v in self.var_bounds],
 737                maximums=[v[1] for v in self.var_bounds],
 738                indexes=(
 739                    self.indexes_int,
 740                    self.indexes_float
 741                ),
 742                creator_initializers=(
 743                    SampleInitializers.RandomInteger,
 744                    SampleInitializers.Uniform
 745                )
 746            )
 747        else:
 748            assert callable(init_creator)
 749            self.creator = init_creator
 750
 751        self.init_oppositors = init_oppositors
 752        self.dup_oppositor = duplicates_oppositor
 753        self.revolution_oppositor = revolution_oppositor
 754
 755        # event for removing duplicates
 756        if remove_duplicates_generation_step is None:
 757            def remover(pop: array2D, scores: array1D, gen: int) -> Tuple[
 758                array2D,
 759                array1D
 760            ]:
 761                return pop, scores
 762        else:
 763            
 764            def without_dup(pop: array2D, scores: array1D):  # returns population without dups
 765                _, index_of_dups = np.unique(pop, axis=0, return_index=True)
 766                return pop[index_of_dups], scores[index_of_dups], scores.size - index_of_dups.size
 767            
 768            if self.dup_oppositor is None:  # if there is no dup_oppositor, use random creator
 769                def remover(pop: array2D, scores: array1D, gen: int) -> Tuple[
 770                    array2D,
 771                    array1D
 772                ]:
 773                    if gen % remove_duplicates_generation_step != 0:
 774                        return pop, scores
 775
 776                    pp, sc, count_to_create = without_dup(pop, scores)  # pop without dups
 777                    
 778                    if count_to_create == 0:
 779                        if SHOW_PROGRESS:
 780                            show_progress(t, self.max_iterations,
 781                                      f"GA is running...{t} gen from {self.max_iterations}. No dups!")
 782                        return pop, scores
 783
 784                    pp2 = SampleInitializers.CreateSamples(self.creator, count_to_create)  # new pop elements
 785                    pp2_scores = self.set_function(pp2)  # new elements values
 786
 787                    if SHOW_PROGRESS:
 788                        show_progress(t, self.max_iterations,
 789                                      f"GA is running...{t} gen from {self.max_iterations}. Kill dups!")
 790                    
 791                    new_pop = np.vstack((pp, pp2))
 792                    new_scores = np.concatenate((sc, pp2_scores))
 793
 794                    args_to_sort = new_scores.argsort()
 795                    return new_pop[args_to_sort], new_scores[args_to_sort]
 796
 797            else:  # using oppositors
 798                assert callable(self.dup_oppositor)
 799
 800                def remover(pop: np.ndarray, scores: np.ndarray, gen: int) -> Tuple[
 801                    np.ndarray,
 802                    np.ndarray
 803                ]:
 804                    if gen % remove_duplicates_generation_step != 0:
 805                        return pop, scores
 806
 807                    pp, sc, count_to_create = without_dup(pop, scores)  # pop without dups
 808
 809                    if count_to_create == 0:
 810                        if SHOW_PROGRESS:
 811                            show_progress(t, self.max_iterations,
 812                                          f"GA is running...{t} gen from {self.max_iterations}. No dups!")
 813                        return pop, scores
 814
 815                    if count_to_create > sc.size:
 816                        raise Exception(
 817                            f"Too many duplicates at generation {gen} ({count_to_create} > {sc.size}), cannot oppose"
 818                        )
 819
 820                    # oppose count_to_create worse elements
 821                    pp2 = OppositionOperators.Reflect(pp[-count_to_create:], self.dup_oppositor)  # new pop elements
 822                    pp2_scores = self.set_function(pp2)  # new elements values
 823
 824                    if SHOW_PROGRESS:
 825                        show_progress(t, self.max_iterations,
 826                                      f"GA is running...{t} gen from {self.max_iterations}. Kill dups!")
 827
 828                    new_pop = np.vstack((pp, pp2))
 829                    new_scores = np.concatenate((sc, pp2_scores))
 830
 831                    args_to_sort = new_scores.argsort()
 832                    return new_pop[args_to_sort], new_scores[args_to_sort]
 833
 834        # event for revolution
 835        if revolution_after_stagnation_step is None:
 836            def revolution(pop: array2D, scores: array1D, stagnation_count: int) -> Tuple[
 837                array2D,
 838                array1D
 839            ]:
 840                return pop, scores
 841        else:
 842            if revolution_oppositor is None:
 843                raise Exception(
 844                    f"How can I make revolution each {revolution_after_stagnation_step} stagnation steps "
 845                    f"if revolution_oppositor is None (not defined)?"
 846                )
 847            assert callable(revolution_oppositor)
 848
 849            from OppOpPopInit import OppositionOperators
 850            
 851            def revolution(pop: array2D, scores: array1D, stagnation_count: int) -> Tuple[
 852                array2D,
 853                array1D
 854            ]:
 855                if stagnation_count < revolution_after_stagnation_step:
 856                    return pop, scores
 857                part = int(pop.shape[0]*revolution_part)
 858                
 859                pp2 = OppositionOperators.Reflect(pop[-part:], self.revolution_oppositor)
 860                pp2_scores = self.set_function(pp2)
 861
 862                combined = np.vstack((pop, pp2))
 863                combined_scores = np.concatenate((scores, pp2_scores))
 864                args = combined_scores.argsort()
 865
 866                if SHOW_PROGRESS:
 867                    show_progress(t, self.max_iterations,
 868                                  f"GA is running...{t} gen from {self.max_iterations}. Revolution!")
 869
 870                args = args[:scores.size]
 871                return combined[args], combined_scores[args]
 872
 873        #enregion
 874
 875        #
 876        #
 877        #  START ALGORITHM LOGIC
 878        #
 879        #
 880
 881        # Report
 882        self._clear_report()  # clear old report objects
 883        self._init_report()
 884
 885        # initialization of pop
 886        
 887        if start_generation.variables is None:
 888
 889            from OppOpPopInit import init_population
 890
 891            real_pop_size = self.population_size * pop_coef
 892
 893            # pop = np.empty((real_pop_size, self.dim))
 894            scores = np.empty(real_pop_size)
 895            
 896            pop = init_population(
 897                samples_count=real_pop_size,
 898                creator=self.creator,
 899                oppositors=self.init_oppositors
 900            )
 901
 902            if self.funtimeout and self.funtimeout > 0:  # perform simulation
 903            
 904                time_counter = 0
 905
 906                for p in range(0, real_pop_size):
 907                    # simulation returns exception or func value -- check the time of evaluating
 908                    value, eval_time = self._simulate(pop[p])
 909                    scores[p] = value
 910                    time_counter += eval_time
 911
 912                if enable_printing:
 913                    print(
 914                        f"\nSim: Average time of function evaluating (secs): "
 915                        f"{time_counter/real_pop_size} (total = {time_counter})\n"
 916                    )
 917            else:
 918
 919                eval_time = time.time()
 920                scores = self.set_function(pop)
 921                eval_time = time.time() - eval_time
 922                if enable_printing:
 923                    print(
 924                        f"\nSet: Average time of function evaluating (secs): "
 925                        f"{eval_time/real_pop_size} (total = {eval_time})\n"
 926                    )
 927                
 928        else:
 929
 930            self.population_size = start_generation.variables.shape[0]
 931            self._set_elit_count(self.population_size, self.param.elit_ratio)
 932            self._set_parents_count(self.param.parents_portion)
 933
 934            pop = start_generation.variables
 935
 936            if start_generation.scores is None:
 937
 938                _time = time.time()
 939                scores = self.set_function(pop)
 940                _time = time.time() - _time
 941
 942                if enable_printing:
 943                    print(
 944                        f'\nFirst scores are made from gotten variables '
 945                        f'(by {_time} secs, about {_time/scores.size} for each creature)\n'
 946                    )
 947            else:
 948                scores = start_generation.scores
 949                if enable_printing:
 950                    print(f"\nFirst scores are from gotten population\n")
 951
 952        # Initialization by select bests and local_descent
 953        
 954        pop, scores = initializer_func(pop, scores)
 955
 956        # first sort
 957        args_to_sort = scores.argsort()
 958        pop = pop[args_to_sort]
 959        scores = scores[args_to_sort]
 960        self._update_report(scores)
 961
 962        self.population_size = scores.size
 963        self.best_function = scores[0]
 964
 965        if enable_printing:
 966            print(
 967                f"Best score before optimization: {self.best_function}"
 968            )
 969
 970        t: int = 1
 971        count_stagnation: int = 0
 972        """iterations without progress"""
 973        reason_to_stop: Optional[str] = None
 974
 975        # gets indexes of 2 parents to crossover
 976        if studEA:
 977            get_parents_inds = lambda: (0, random.randrange(1, self.parents_count))
 978        else:
 979            get_parents_inds = lambda: random_indexes_pair(self.parents_count)
 980
 981        #  while no reason to stop
 982        while True:
 983
 984            if count_stagnation > self.max_stagnations:
 985                reason_to_stop = f"limit of fails: {count_stagnation}"
 986            elif t == self.max_iterations:
 987                reason_to_stop = f'limit of iterations: {t}'
 988            elif stop_by_val(self.best_function):
 989                reason_to_stop = f"stop value reached: {self.best_function} <= {stop_when_reached}"
 990            elif time_is_done():
 991                reason_to_stop = f'time is done: {time.time() - start_time} >= {time_limit_secs}'
 992
 993            if reason_to_stop is not None:
 994                if SHOW_PROGRESS:
 995                    show_progress(t, self.max_iterations, f"GA is running... STOP! {reason_to_stop}")
 996                break
 997
 998            if scores[0] < self.best_function:  # if there is progress
 999                count_stagnation = 0
1000                self.best_function = scores[0]
1001            else:
1002                count_stagnation += 1
1003
1004            if SHOW_PROGRESS:
1005                show_progress(
1006                    t,
1007                    self.max_iterations,
1008                    f"GA is running...{t} gen from {self.max_iterations}...best value = {self.best_function}"
1009                )
1010
1011            # Select parents
1012            
1013            par: array2D = np.empty((self.parents_count, self.dim))
1014            """samples chosen to create new samples"""
1015            par_scores: array1D = np.empty(self.parents_count)
1016
1017            elit_slice = slice(None, self.elit_count)
1018            """elit parents"""
1019
1020            # copy needs because the generation will be removed after parents selection
1021            par[elit_slice] = pop[elit_slice].copy()
1022            par_scores[elit_slice] = scores[elit_slice].copy()
1023
1024            new_par_inds = (
1025                self.selection(
1026                    scores[self.elit_count:],
1027                    self.parents_count - self.elit_count
1028                ).astype(np.int16) + self.elit_count
1029            )
1030            """non-elit parents indexes"""
1031            #new_par_inds = self.selection(scores, self.parents_count - self.elit_count).astype(np.int16)
1032            par_slice = slice(self.elit_count, self.parents_count)
1033            par[par_slice] = pop[new_par_inds].copy()
1034            par_scores[par_slice] = scores[new_par_inds].copy()
1035
1036            pop = np.empty((self.population_size, self.dim))
1037            """new generation"""
1038            scores = np.empty(self.population_size)
1039            """new generation scores"""
1040
1041            parents_slice = slice(None, self.parents_count)
1042            pop[parents_slice] = par
1043            scores[parents_slice] = par_scores
1044
1045            if self.fill_children is None:  # default fill children behaviour
1046                DO_MUTATION = self.needs_mutation
1047                for k in range(self.parents_count, self.population_size, 2):
1048
1049                    r1, r2 = get_parents_inds()
1050
1051                    pvar1 = pop[r1]  # equal to par[r1], but better for cache
1052                    pvar2 = pop[r2]
1053
1054                    ch1, ch2 = self.crossover(pvar1, pvar2)
1055
1056                    if DO_MUTATION:
1057                        ch1 = self.mut(ch1)
1058                        ch2 = self.mut_middle(ch2, pvar1, pvar2)
1059
1060                    pop[k] = ch1
1061                    pop[k+1] = ch2
1062            else:  # custom behaviour
1063                self.fill_children(pop, self.parents_count)
1064
1065            if apply_function_to_parents:
1066                scores = self.set_function(pop)
1067            else:
1068                scores[self.parents_count:] = self.set_function(pop[self.parents_count:])
1069            
1070            # remove duplicates
1071            pop, scores = remover(pop, scores, t)
1072            # revolution
1073            pop, scores = revolution(pop, scores, count_stagnation)
1074
1075            # make population better
1076            args_to_sort = scores.argsort()
1077            pop = pop[args_to_sort]
1078            scores = scores[args_to_sort]
1079            self._update_report(scores)
1080
1081            # callback it
1082            total_callback(t, self.report, pop, scores)
1083            total_middle_callback()
1084
1085            t += 1
1086
1087        self.best_function = fast_min(scores[0], self.best_function)
1088
1089        last_generation = Generation(pop, scores)
1090        self.result = GAResult(last_generation)
1091
1092        if save_last_generation_as is not None:
1093            last_generation.save(save_last_generation_as)
1094
1095        if enable_printing:
1096            show = ' ' * 200
1097            sys.stdout.write(
1098                f'\r{show}\n'
1099                f'\r The best found solution:\n {pop[0]}'
1100                f'\n\n Objective function:\n {self.best_function}\n'
1101                f'\n Used generations: {len(self.report)}'
1102                f'\n Used time: {time.time() - start_time:.3g} seconds\n'
1103            )
1104            sys.stdout.flush() 
1105        
1106        if not no_plot:
1107            self.plot_results()
1108
1109        if enable_printing:
1110            if reason_to_stop is not None and 'iterations' not in reason_to_stop:
1111                sys.stdout.write(
1112                    f'\nWarning: GA is terminated because of {reason_to_stop}'
1113                )
1114
1115        return self.result
1116
1117    #endregion
1118
1119    #region PLOTTING
1120
1121    def plot_results(
1122        self,
1123        title: str = 'Genetic Algorithm',
1124        save_as: Optional[str] = None,
1125        dpi: int = 200,
1126        main_color: str = 'blue'
1127     ):
1128        """
1129        Simple plot of self.report (if not empty)
1130        """
1131        if len(self.report) == 0:
1132            sys.stdout.write("No results to plot!\n")
1133            return
1134
1135        plot_several_lines(
1136            lines=[self.report],
1137            colors=[main_color],
1138            labels=['best of generation'],
1139            linewidths=[2],
1140            title=title,
1141            xlabel='Generation',
1142            ylabel='Minimized function',
1143            save_as=save_as,
1144            dpi=dpi
1145        )
1146
1147    def plot_generation_scores(self, title: str = 'Last generation scores', save_as: Optional[str] = None):
1148        """
1149        Plots barplot of scores of last population
1150        """
1151
1152        if not hasattr(self, 'result'):
1153            raise Exception(
1154                "There is no 'result' field into ga object! Before plotting generation u need to run seaching process"
1155            )
1156
1157        plot_pop_scores(self.result.last_generation.scores, title, save_as)
1158
1159    #endregion
1160
1161    #region MUTATION
1162    def mut(self, x: array1D):
1163        """
1164        just mutation
1165        """
1166
1167        for i in self.indexes_int_mut:
1168            if random.random() < self.prob_mut_discrete:
1169                x[i] = self.discrete_mutation(x[i], *self.var_bounds[i])
1170
1171        for i in self.indexes_float_mut:                
1172            if random.random() < self.prob_mut:
1173                x[i] = self.real_mutation(x[i], *self.var_bounds[i])
1174            
1175        return x
1176
1177    def mut_middle(self, x: array1D, p1: array1D, p2: array1D):
1178        """
1179        mutation oriented on parents
1180        """
1181        for i in self.indexes_int_mut:
1182
1183            if random.random() < self.prob_mut_discrete:
1184                v1, v2 = p1[i], p2[i]
1185                if v1 < v2:
1186                    x[i] = random.randint(v1, v2)
1187                elif v1 > v2:
1188                    x[i] = random.randint(v2, v1)
1189                else:
1190                    x[i] = random.randint(*self.var_bounds[i])
1191                        
1192        for i in self.indexes_float_mut:                
1193            if random.random() < self.prob_mut:
1194                v1, v2 = p1[i], p2[i]
1195                if v1 != v2:
1196                    x[i] = random.uniform(v1, v2)
1197                else:
1198                    x[i] = random.uniform(*self.var_bounds[i])
1199        return x
1200
1201    #endregion
1202
1203    #region Set functions
1204
1205    @staticmethod
1206    def default_set_function(function_for_set: Callable[[array1D], float]):
1207        """
1208        simple function for creating set_function 
1209        function_for_set just applies to each row of population
1210        """
1211        def func(matrix: array2D):
1212            return np.array(
1213                [function_for_set(row) for row in matrix]
1214            )
1215        return func
1216
1217    @staticmethod
1218    def vectorized_set_function(function_for_set: Callable[[array1D], float]):
1219        """
1220        works like default, but faster for big populations and slower for little
1221        function_for_set just applyes to each row of population
1222        """
1223
1224        func = np.vectorize(function_for_set, signature='(n)->()')
1225        return func
1226
1227    @staticmethod
1228    def set_function_multiprocess(function_for_set: Callable[[array1D], float], n_jobs: int = -1):
1229        """
1230        like function_for_set but uses joblib with n_jobs (-1 goes to count of available processors)
1231        """
1232        try:
1233            from joblib import Parallel, delayed
1234        except ModuleNotFoundError:
1235            raise ModuleNotFoundError(
1236                "this additional feature requires joblib package," 
1237                "run `pip install joblib` or `pip install --upgrade geneticalgorithm2[full]`" 
1238                "or use another set function"
1239            )
1240
1241        def func(matrix: array2D):
1242            result = Parallel(n_jobs=n_jobs)(delayed(function_for_set)(matrix[i]) for i in range(matrix.shape[0]))
1243            return np.array(result)
1244        return func
1245            
1246    #endregion
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260            
1261            
VARIABLE_TYPE: typing_extensions.TypeAlias = typing.Literal['int', 'real', 'bool']

the variable type for a given or all dimension, determines the values discretion:

real: double numbers

int: integer number only

bool: in the fact is integer with bounds [0, 1]
class GeneticAlgorithm2:
  55class GeneticAlgorithm2:
  56    """
  57    Genetic algorithm optimization process
  58    """
  59    
  60    default_params = AlgorithmParams()
  61    PROGRESS_BAR_LEN = 20
  62    """max count of symbols in the progress bar"""
  63
  64    @property
  65    def output_dict(self):
  66        warnings.warn(
  67            "'output_dict' is deprecated and will be removed at version 7 \n"
  68            "use 'result' instead"
  69        )
  70        return self.result
  71
  72    @property
  73    def needs_mutation(self) -> bool:
  74        """whether the mutation is required"""
  75        return self.prob_mut > 0 or self.prob_mut_discrete > 0
  76
  77    #region INIT
  78
  79    def __init__(
  80        self,
  81        function: FunctionToMinimize = None,
  82
  83        dimension: int = 0,
  84        variable_type: Union[VARIABLE_TYPE, Sequence[VARIABLE_TYPE]] = 'bool',
  85        variable_boundaries: Optional[Union[array2D, Sequence[Tuple[float, float]]]] = None,
  86
  87        variable_type_mixed=None,
  88
  89        function_timeout: Optional[float] = None,
  90        algorithm_parameters: Union[AlgorithmParams, Dict[str, Any]] = default_params
  91    ):
  92        """
  93        initializes the GA object and performs main checks
  94
  95        Args:
  96            function: the given objective function to be minimized -- deprecated and moved to run() method
  97            dimension: the number of decision variables, the population samples dimension
  98
  99            variable_type: string means the variable type for all variables,
 100                for mixed types use sequence of strings of type for each variable
 101
 102            variable_boundaries: leave it None if variable_type is 'bool';
 103                otherwise provide a sequence of tuples of length two as boundaries for each variable;
 104                the length of the array must be equal dimension.
 105                For example, ([0,100], [0,200]) determines
 106                    lower boundary 0 and upper boundary 100 for first
 107                    and upper boundary 200 for second variable
 108                    and dimension must be 2.
 109
 110            variable_type_mixed -- deprecated
 111
 112            function_timeout: if the given function does not provide
 113                output before function_timeout (unit is seconds) the algorithm raises error.
 114                For example, when there is an infinite loop in the given function.
 115                `None` means disabling -- deprecated and moved to run()
 116
 117            algorithm_parameters: AlgorithmParams object or usual dictionary with algorithm parameter;
 118                it is not mandatory to provide all possible parameters
 119
 120        Notes:
 121            - This implementation minimizes the given objective function.
 122            For maximization u can multiply the function by -1 (for instance): the absolute
 123                value of the output would be the actual objective function
 124
 125        for more details and examples of implementation please visit:
 126            https://github.com/PasaOpasen/geneticalgorithm2
 127  
 128        """
 129
 130        # all default fields
 131
 132        # self.crossover: Callable[[np.ndarray, np.ndarray], Tuple[np.ndarray, np.ndarray]] = None
 133        # self.real_mutation: Callable[[float, float, float], float] = None
 134        # self.discrete_mutation: Callable[[int, int, int], int] = None
 135        # self.selection: Callable[[np.ndarray, int], np.ndarray] = None
 136
 137        self.result = None
 138        self.best_function = None
 139
 140        self.revolution_oppositor = None
 141        self.dup_oppositor = None
 142        self.creator = None
 143        self.init_oppositors = None
 144
 145        self.f: Callable[[array1D], float] = None
 146        self.funtimeout: float = None
 147
 148        self.set_function: Callable[[np.ndarray], np.ndarray] = None
 149
 150        # self.dim: int = None
 151        self.var_bounds: List[Tuple[Union[int, float], Union[int, float]]] = None
 152        # self.indexes_int: np.ndarray = None
 153        # self.indexes_float: np.ndarray = None
 154
 155        self.checked_reports: List[Tuple[str, Callable[[array1D], None]]] = None
 156
 157        self.population_size: int = None
 158        self.progress_stream = None
 159
 160        # input algorithm's parameters
 161
 162        assert isinstance(algorithm_parameters, (dict, AlgorithmParams)), (
 163            "algorithm_parameters must be dict or AlgorithmParams object"
 164        )
 165        if not isinstance(algorithm_parameters, AlgorithmParams):
 166            algorithm_parameters = AlgorithmParams.from_dict(algorithm_parameters)
 167        algorithm_parameters.validate()
 168        self.param = algorithm_parameters
 169
 170        self.crossover, self.real_mutation, self.discrete_mutation, self.selection = algorithm_parameters.get_CMS_funcs()
 171
 172        # dimension and area bounds
 173        self.dim = int(dimension)
 174        assert self.dim > 0, f"dimension count must be int and >0, gotten {dimension}"
 175
 176        if variable_type_mixed is not None:
 177            warnings.warn(
 178                f"argument variable_type_mixed is deprecated and will be removed at version 7\n "
 179                f"use variable_type={tuple(variable_type_mixed)} instead"
 180            )
 181            variable_type = variable_type_mixed
 182        self._set_types_indexes(variable_type)  # types indexes
 183        self._set_var_boundaries(variable_type, variable_boundaries)  # input variables' boundaries
 184
 185        # fix mutation probs
 186
 187        assert can_be_prob(self.param.mutation_probability)
 188        self.prob_mut = self.param.mutation_probability
 189        assert self.param.mutation_discrete_probability is None or can_be_prob(self.param.mutation_discrete_probability)
 190        self.prob_mut_discrete = self.param.mutation_discrete_probability or self.prob_mut
 191
 192        if self.param.crossover_probability is not None:
 193            warnings.warn(
 194                f"crossover_probability is deprecated and will be removed in version 7. "
 195                f"Reason: it's old and has no sense"
 196            )
 197
 198        #############################################################
 199        # input function
 200        if function:
 201            warnings.warn(
 202                f"function is deprecated in init constructor and will be removed in version 7. "
 203                f"Move this argument to run() method"
 204            )
 205            self._check_function(function)
 206        if function_timeout:
 207            warnings.warn(
 208                f"function_timeout is deprecated in init constructor and will be removed in version 7. "
 209                f"Move this argument to run() method"
 210            )
 211            self._check_function_timeout(function_timeout)
 212
 213        #############################################################
 214        
 215        self.population_size = int(self.param.population_size)
 216        self._set_parents_count(self.param.parents_portion)
 217        self._set_elit_count(self.population_size, self.param.elit_ratio)
 218        assert self.parents_count >= self.elit_count, (
 219            f"\n number of parents ({self.parents_count}) "
 220            f"must be greater than number of elits ({self.elit_count})"
 221        )
 222
 223        self._set_max_iterations()
 224
 225        self._set_report()
 226
 227        # specify this function to speed up or change default behaviour
 228        self.fill_children: Optional[Callable[[array2D, int], None]] = None
 229        """
 230        custom function which adds children for population POP 
 231            where POP[:parents_count] are parents lines and next lines are for children
 232        """
 233
 234    def _set_types_indexes(self, variable_type: Union[str, Sequence[str]]):
 235
 236        indexes = np.arange(self.dim)
 237        self.indexes_int = np.array([])
 238        self.indexes_float = np.array([])
 239
 240        assert_message = (
 241            f"\n variable_type must be 'bool', 'int', 'real' or a sequence with 'int' and 'real', got {variable_type}"
 242        )
 243
 244        if isinstance(variable_type, str):
 245            assert (variable_type in VARIABLE_TYPE.__args__), assert_message
 246            if variable_type == 'real':
 247                self.indexes_float = indexes
 248            else:
 249                self.indexes_int = indexes
 250
 251        else:  # sequence case
 252
 253            assert len(variable_type) == self.dim, (
 254                f"\n variable_type must have a length equal dimension. "
 255                f"Should be {self.dim}, got {len(variable_type)}"
 256            )
 257            assert 'bool' not in variable_type, (
 258                "don't use 'bool' if variable_type is a sequence, "
 259                "for 'boolean' case use 'int' and specify boundary as (0,1)"
 260            )
 261            assert all(v in VARIABLE_TYPE.__args__ for v in variable_type), assert_message
 262
 263            vartypes = np.array(variable_type)
 264            self.indexes_int = indexes[vartypes == 'int']
 265            self.indexes_float = indexes[vartypes == 'real']
 266
 267    def _set_var_boundaries(
 268        self,
 269        variable_type: Union[str, Sequence[str]],
 270        variable_boundaries
 271    ):
 272        if isinstance(variable_type, str) and variable_type == 'bool':
 273            self.var_bounds = [(0, 1)] * self.dim
 274        else:
 275
 276            if is_numpy(variable_boundaries):
 277                assert variable_boundaries.shape == (self.dim, 2), (
 278                    f"\n if variable_boundaries is numpy array, it must be with shape (dim, 2)"
 279                )
 280            else:
 281                assert len(variable_boundaries) == self.dim and all((len(t) == 2 for t in variable_boundaries)), (
 282                    "\n if variable_boundaries is sequence, "
 283                    "it must be with len dim and boundary for each variable must be a tuple of length two"
 284                )
 285
 286            for i in variable_boundaries:
 287                assert i[0] <= i[1], "\n lower_boundaries must be smaller than upper_boundaries [lower,upper]"
 288
 289            self.var_bounds = [(i[0], i[1]) for i in variable_boundaries]
 290
 291    def _set_parents_count(self, parents_portion: float):
 292
 293        self.parents_count = int(parents_portion * self.population_size)
 294        assert self.population_size >= self.parents_count > 1, (
 295            f'parents count {self.parents_count} cannot be less than population size {self.population_size}'
 296        )
 297        trl = self.population_size - self.parents_count
 298        if trl % 2 != 0:
 299            self.parents_count += 1
 300
 301    def _set_elit_count(self, pop_size: int, elit_ratio: Union[float, int]):
 302
 303        assert elit_ratio >= 0
 304        self.elit_count = elit_ratio if isinstance(elit_ratio, str) else math.ceil(pop_size*elit_ratio)
 305
 306    def _set_max_iterations(self):
 307
 308        if self.param.max_num_iteration is None:
 309            iterate = 0
 310            for i in range(0, self.dim):
 311                bound_min, bound_max = self.var_bounds[i]
 312                var_space = bound_max - bound_min
 313                if i in self.indexes_int:
 314                    iterate += var_space * self.dim * (100 / self.population_size)
 315                else:
 316                    iterate += var_space * 50 * (100 / self.population_size)
 317            iterate = int(iterate)
 318            if (iterate * self.population_size) > 10000000:
 319                iterate = 10000000 / self.population_size
 320
 321            self.max_iterations = fast_min(iterate, 8000)
 322        else:
 323            assert self.param.max_num_iteration > 0
 324            self.max_iterations = math.ceil(self.param.max_num_iteration)
 325
 326        max_it = self.param.max_iteration_without_improv
 327        if max_it is None:
 328            self.max_stagnations = self.max_iterations + 1
 329        else:
 330            self.max_stagnations = math.ceil(max_it)
 331
 332    def _check_function(self, function: Callable[[array1D], float]):
 333        assert callable(function), "function must be callable!"
 334        self.f = function
 335
 336    def _check_function_timeout(self, function_timeout: Optional[float]):
 337
 338        if function_timeout is not None and function_timeout > 0:
 339            try:
 340                from func_timeout import func_timeout, FunctionTimedOut
 341            except ModuleNotFoundError:
 342                raise ModuleNotFoundError(
 343                    "function_timeout > 0 needs additional package func_timeout\n"
 344                    "run `python -m pip install func_timeout`\n"
 345                    "or disable this parameter: function_timeout=None"
 346                )
 347
 348        self.funtimeout = None if function_timeout is None else float(function_timeout)
 349
 350    #endregion
 351
 352    #region REPORT
 353
 354    def _set_report(self):
 355        """
 356        creates default report checker
 357        """
 358        self.checked_reports = [
 359            # item 0 cuz scores will be sorted and min item is items[0]
 360            ('report', operator.itemgetter(0))
 361        ]
 362
 363    def _clear_report(self):
 364        """
 365        removes all report objects
 366        """
 367        fields = [f for f in vars(self).keys() if f.startswith('report')]
 368        for attr in fields:
 369            delattr(self, attr)
 370
 371    def _init_report(self):
 372        """
 373        makes empty report fields
 374        """
 375        for name, _ in self.checked_reports:
 376            setattr(self, name, [])
 377
 378    def _update_report(self, scores: array1D):
 379        """
 380        append report value to the end of field
 381        """
 382        for name, func in self.checked_reports:
 383            getattr(self, name).append(
 384                func(scores)
 385            )
 386
 387    #endregion
 388
 389    #region RUN METHODS
 390
 391    def _progress(self, count: int, total: int, status: str = ''):
 392
 393        part = count / total
 394
 395        filled_len = round(GeneticAlgorithm2.PROGRESS_BAR_LEN * part)
 396        percents = round(100.0 * part, 1)
 397        bar = '|' * filled_len + '_' * (GeneticAlgorithm2.PROGRESS_BAR_LEN - filled_len)
 398
 399        self.progress_stream.write('\r%s %s%s %s' % (bar, percents, '%', status))
 400        self.progress_stream.flush()
 401
 402    def __str__(self):
 403        return f"Genetic algorithm object with parameters {self.param}"
 404
 405    def __repr__(self):
 406        return self.__str__()
 407
 408    def _simulate(self, sample: array1D):
 409
 410        from func_timeout import func_timeout, FunctionTimedOut
 411
 412        obj = None
 413        eval_time = time.time()
 414        try:
 415            obj = func_timeout(
 416                self.funtimeout,
 417                lambda: self.f(sample)
 418            )
 419        except FunctionTimedOut:
 420            print("given function is not applicable")
 421        eval_time = time.time() - eval_time
 422
 423        assert obj is not None, (
 424            f"the given function was running like {eval_time} seconds and does not provide any output"
 425        )
 426
 427        tp = type(obj)
 428        assert (
 429            tp in (int, float) or np.issubdtype(tp, np.floating) or np.issubdtype(tp, np.integer)
 430        ), f"Minimized function should return a number, but got '{obj}' object with type {tp}"
 431
 432        return obj, eval_time
 433
 434    def _set_mutation_indexes(self, mutation_indexes: Optional[Iterable[int]]):
 435
 436        if mutation_indexes is None:
 437            self.indexes_float_mut = self.indexes_float
 438            self.indexes_int_mut = self.indexes_int
 439        else:
 440            tmp_indexes = set(mutation_indexes)
 441            self.indexes_int_mut = np.array(list(tmp_indexes.intersection(self.indexes_int)))
 442            self.indexes_float_mut = np.array(list(tmp_indexes.intersection(self.indexes_float)))
 443
 444            if self.indexes_float_mut.size == 0 and self.indexes_int_mut.size == 0:
 445                warnings.warn(f"No mutation dimensions!!! Check ur mutation indexes!!")
 446
 447    #@profile
 448    def run(
 449        self,
 450        no_plot: bool = False,
 451        disable_printing: bool = False,
 452        progress_bar_stream: Optional[str] = 'stdout',
 453
 454        # deprecated
 455        disable_progress_bar: bool = False,
 456
 457        function: FunctionToMinimize = None,
 458        function_timeout: Optional[float] = None,
 459
 460        set_function: SetFunctionToMinimize = None,
 461        apply_function_to_parents: bool = False,
 462        start_generation: GenerationConvertible = Generation(),
 463        studEA: bool = False,
 464        mutation_indexes: Optional[Iterable[int]] = None,
 465
 466        init_creator: Optional[CreatorFunc] = None,
 467        init_oppositors: Optional[Sequence[OppositorFunc]] = None,
 468
 469        duplicates_oppositor: Optional[OppositorFunc] = None,
 470        remove_duplicates_generation_step: Optional[int] = None,
 471
 472        revolution_oppositor: Optional[OppositorFunc] = None,
 473        revolution_after_stagnation_step: Optional[int] = None,
 474        revolution_part: float = 0.3,
 475
 476        population_initializer: Tuple[
 477            int, PopulationModifier
 478        ] = get_population_initializer(select_best_of=1, local_optimization_step='never', local_optimizer=None),
 479
 480        stop_when_reached: Optional[float] = None,
 481        callbacks: Optional[Sequence[SimpleCallbackFunc]] = None,
 482        middle_callbacks: Optional[Sequence[MiddleCallbackFunc]] = None,  #+
 483        time_limit_secs: Optional[float] = None,
 484        save_last_generation_as: Optional[str] = None,
 485        seed: Optional[int] = None
 486    ) -> GAResult:
 487        """
 488        runs optimization process
 489
 490        Args:
 491            no_plot: do not plot results using matplotlib by default
 492
 493            disable_printing: do not print log info of optimization process (except progress bar)
 494
 495            progress_bar_stream: 'stdout', 'stderr' or None to disable progress bar;
 496                disabling progress bar can speed up the optimization process in many cases
 497
 498            disable_progress_bar: deprecated
 499
 500            function: the given objective function (sample -> its score) to be minimized;
 501
 502            function_timeout: if the given function does not provide
 503                output before function_timeout (unit is seconds) the algorithm raises error.
 504                For example, when there is an infinite loop in the given function.
 505                `None` means disabling
 506
 507            set_function: set function (all samples -> score per sample) to be used instead of usual function
 508                (usually for optimization purposes)
 509
 510            apply_function_to_parents: whether to apply function to parents from previous generation (if it's needed), 
 511                it can be needed at working with games agents, but for other tasks will just waste time
 512
 513            start_generation: initial generation object of any `GenerationConvertible` type
 514
 515            studEA: using stud EA strategy (crossover with best object always)
 516
 517            mutation_indexes: indexes of dimensions where mutation can be performed (all dimensions by default)
 518
 519            init_creator: the function creates population samples.
 520                By default -- random uniform for real variables and random uniform for int
 521            init_oppositors: the list of oppositors creates oppositions for base population. No by default
 522
 523            duplicates_oppositor: oppositor for applying after duplicates removing.
 524                By default -- using just random initializer from creator
 525            remove_duplicates_generation_step: step for removing duplicates (have a sense with discrete tasks).
 526                No by default
 527
 528            revolution_oppositor: oppositor for revolution time. No by default
 529            revolution_after_stagnation_step: create revolution after this generations of stagnation. No by default
 530            revolution_part: float, the part of generation to being oppose. By default is 0.3
 531
 532            population_initializer: object for actions at population initialization step
 533                to create better start population. See doc
 534
 535            stop_when_reached: stop searching after reaching this value 
 536                (it can be potential minimum or something else)
 537
 538            callbacks: sequence of callback functions with structure:
 539                (generation_number, report_list, last_population, last_scores) -> do some action
 540
 541            middle_callbacks: sequence of functions made by `MiddleCallback` class
 542
 543            time_limit_secs: limit time of working (in seconds);
 544                `None` means no time limit (limit will be only for count of generation and so on)
 545
 546            save_last_generation_as: path to .npz file 
 547                for saving last_generation as numpy dictionary like
 548                {'population': 2D-array, 'scores': 1D-array}; 
 549                None disables this option
 550
 551            seed: random seed (None if doesn't matter)
 552
 553        Returns:
 554            `GAResult` object;
 555            also fills the self.report and self.result with many report information
 556
 557        Notes:
 558            - if `function_timeout` is enabled then `function` must be set
 559            - it would be more logical to use params like `studEA` as an algorithm parameter, 
 560                but `run()`-way can be more convenient for real using
 561        """
 562
 563        if disable_progress_bar:
 564            warnings.warn(
 565                f"disable_progress_bar is deprecated and will be removed in version 7, "
 566                f"use probress_bar_stream=None to disable progress bar"
 567            )
 568            progress_bar_stream = None
 569
 570        enable_printing: bool = not disable_printing
 571
 572        start_generation = Generation.from_object(self.dim, start_generation)
 573
 574        assert is_current_gen_number(revolution_after_stagnation_step), "must be None or int and >0"
 575        assert is_current_gen_number(remove_duplicates_generation_step), "must be None or int and >0"
 576        assert can_be_prob(revolution_part), f"revolution_part must be in [0,1], not {revolution_part}"
 577        assert stop_when_reached is None or isinstance(stop_when_reached, (int, float))
 578        assert isinstance(callbacks, collections.abc.Sequence) or callbacks is None, (
 579            "callbacks should be a list of callbacks functions"
 580        )
 581        assert isinstance(middle_callbacks, collections.abc.Sequence) or middle_callbacks is None, (
 582            "middle_callbacks should be list of MiddleCallbacks functions"
 583        )
 584        assert time_limit_secs is None or time_limit_secs > 0, 'time_limit_secs must be None of number > 0'
 585
 586        self._set_mutation_indexes(mutation_indexes)
 587        from OppOpPopInit import set_seed
 588        set_seed(seed)
 589
 590        # randomstate = np.random.default_rng(random.randint(0, 1000) if seed is None else seed)
 591        # self.randomstate = randomstate
 592
 593        # using bool flag is faster than using empty function with generated string arguments
 594        SHOW_PROGRESS = progress_bar_stream is not None
 595        if SHOW_PROGRESS:
 596
 597            show_progress = lambda t, t2, s: self._progress(t, t2, status=s)
 598
 599            if progress_bar_stream == 'stdout':
 600                self.progress_stream = sys.stdout
 601            elif progress_bar_stream == 'stderr':
 602                self.progress_stream = sys.stderr
 603            else:
 604                raise Exception(
 605                    f"wrong value {progress_bar_stream} of progress_bar_stream, must be 'stdout'/'stderr'/None"
 606                )
 607        else:
 608            show_progress = None
 609
 610        stop_by_val = (
 611            (lambda best_f: False)
 612            if stop_when_reached is None
 613            else (lambda best_f: best_f <= stop_when_reached)
 614        )
 615
 616        t: int = 0
 617        count_stagnation: int = 0
 618        pop: array2D = None
 619        scores: array1D = None
 620
 621        #region CALLBACKS
 622
 623        def get_data():
 624            """
 625            returns all important data about model
 626            """
 627            return MiddleCallbackData(
 628                last_generation=Generation(pop, scores),
 629                current_generation=t,
 630                report_list=self.report,
 631
 632                mutation_prob=self.prob_mut,
 633                mutation_discrete_prob=self.prob_mut_discrete,
 634                mutation=self.real_mutation,
 635                mutation_discrete=self.discrete_mutation,
 636                crossover=self.crossover,
 637                selection=self.selection,
 638
 639                current_stagnation=count_stagnation,
 640                max_stagnation=self.max_stagnations,
 641
 642                parents_portion=self.param.parents_portion,
 643                elit_ratio=self.param.elit_ratio,
 644
 645                set_function=self.set_function,
 646
 647                reason_to_stop=reason_to_stop
 648            )
 649
 650        def set_data(data: MiddleCallbackData):
 651            """
 652            sets data to model
 653            """
 654            nonlocal pop, scores, count_stagnation, reason_to_stop
 655
 656            pop, scores = data.last_generation.variables, data.last_generation.scores
 657            self.population_size = pop.shape[0]
 658
 659            self.param.parents_portion = data.parents_portion
 660            self._set_parents_count(data.parents_portion)
 661
 662            self.param.elit_ratio = data.elit_ratio
 663            self._set_elit_count(self.population_size, data.elit_ratio)
 664
 665            self.prob_mut = data.mutation_prob
 666            self.prob_mut_discrete = data.mutation_discrete_prob
 667            self.real_mutation = data.mutation
 668            self.discrete_mutation = data.mutation_discrete
 669            self.crossover = data.crossover
 670            self.selection = data.selection
 671
 672            count_stagnation = data.current_stagnation
 673            reason_to_stop = data.reason_to_stop
 674            self.max_stagnations = data.max_stagnation
 675
 676            self.set_function = data.set_function
 677
 678        if not callbacks:
 679            total_callback = lambda g, r, lp, ls: None
 680        else:
 681            def total_callback(
 682                generation_number: int,
 683                report_list: List[float],
 684                last_population: array2D,
 685                last_scores: array1D
 686            ):
 687                for cb in callbacks:
 688                    cb(generation_number, report_list, last_population, last_scores)
 689
 690        if not middle_callbacks:
 691            total_middle_callback = lambda: None
 692        else:
 693            def total_middle_callback():
 694                """
 695                applies callbacks and sets new data if there is a sense
 696                """
 697                data = get_data()
 698                flag = False
 699                for cb in middle_callbacks:
 700                    data, has_sense = cb(data)
 701                    if has_sense:
 702                        flag = True
 703                if flag:
 704                    set_data(data)  # update global date if there was real callback step
 705
 706        #endregion
 707
 708        start_time = time.time()
 709        time_is_done = (
 710            (lambda: False)
 711            if time_limit_secs is None
 712            else (lambda: int(time.time() - start_time) >= time_limit_secs)
 713        )
 714
 715        # combine with deprecated parts
 716        function = function or self.f
 717        function_timeout = function_timeout or self.funtimeout
 718
 719        assert function or set_function, 'no function to minimize'
 720        if function:
 721            self._check_function(function)
 722        if function_timeout:
 723            self._check_function_timeout(function_timeout)
 724
 725        self.set_function = set_function or GeneticAlgorithm2.default_set_function(self.f)
 726
 727        #region Initial population, duplicates filter, revolutionary
 728
 729        pop_coef, initializer_func = population_initializer
 730        
 731        # population creator by random or with oppositions
 732        if init_creator is None:
 733
 734            from OppOpPopInit import SampleInitializers
 735
 736            # just uniform random
 737            self.creator = SampleInitializers.Combined(
 738                minimums=[v[0] for v in self.var_bounds],
 739                maximums=[v[1] for v in self.var_bounds],
 740                indexes=(
 741                    self.indexes_int,
 742                    self.indexes_float
 743                ),
 744                creator_initializers=(
 745                    SampleInitializers.RandomInteger,
 746                    SampleInitializers.Uniform
 747                )
 748            )
 749        else:
 750            assert callable(init_creator)
 751            self.creator = init_creator
 752
 753        self.init_oppositors = init_oppositors
 754        self.dup_oppositor = duplicates_oppositor
 755        self.revolution_oppositor = revolution_oppositor
 756
 757        # event for removing duplicates
 758        if remove_duplicates_generation_step is None:
 759            def remover(pop: array2D, scores: array1D, gen: int) -> Tuple[
 760                array2D,
 761                array1D
 762            ]:
 763                return pop, scores
 764        else:
 765            
 766            def without_dup(pop: array2D, scores: array1D):  # returns population without dups
 767                _, index_of_dups = np.unique(pop, axis=0, return_index=True)
 768                return pop[index_of_dups], scores[index_of_dups], scores.size - index_of_dups.size
 769            
 770            if self.dup_oppositor is None:  # if there is no dup_oppositor, use random creator
 771                def remover(pop: array2D, scores: array1D, gen: int) -> Tuple[
 772                    array2D,
 773                    array1D
 774                ]:
 775                    if gen % remove_duplicates_generation_step != 0:
 776                        return pop, scores
 777
 778                    pp, sc, count_to_create = without_dup(pop, scores)  # pop without dups
 779                    
 780                    if count_to_create == 0:
 781                        if SHOW_PROGRESS:
 782                            show_progress(t, self.max_iterations,
 783                                      f"GA is running...{t} gen from {self.max_iterations}. No dups!")
 784                        return pop, scores
 785
 786                    pp2 = SampleInitializers.CreateSamples(self.creator, count_to_create)  # new pop elements
 787                    pp2_scores = self.set_function(pp2)  # new elements values
 788
 789                    if SHOW_PROGRESS:
 790                        show_progress(t, self.max_iterations,
 791                                      f"GA is running...{t} gen from {self.max_iterations}. Kill dups!")
 792                    
 793                    new_pop = np.vstack((pp, pp2))
 794                    new_scores = np.concatenate((sc, pp2_scores))
 795
 796                    args_to_sort = new_scores.argsort()
 797                    return new_pop[args_to_sort], new_scores[args_to_sort]
 798
 799            else:  # using oppositors
 800                assert callable(self.dup_oppositor)
 801
 802                def remover(pop: np.ndarray, scores: np.ndarray, gen: int) -> Tuple[
 803                    np.ndarray,
 804                    np.ndarray
 805                ]:
 806                    if gen % remove_duplicates_generation_step != 0:
 807                        return pop, scores
 808
 809                    pp, sc, count_to_create = without_dup(pop, scores)  # pop without dups
 810
 811                    if count_to_create == 0:
 812                        if SHOW_PROGRESS:
 813                            show_progress(t, self.max_iterations,
 814                                          f"GA is running...{t} gen from {self.max_iterations}. No dups!")
 815                        return pop, scores
 816
 817                    if count_to_create > sc.size:
 818                        raise Exception(
 819                            f"Too many duplicates at generation {gen} ({count_to_create} > {sc.size}), cannot oppose"
 820                        )
 821
 822                    # oppose count_to_create worse elements
 823                    pp2 = OppositionOperators.Reflect(pp[-count_to_create:], self.dup_oppositor)  # new pop elements
 824                    pp2_scores = self.set_function(pp2)  # new elements values
 825
 826                    if SHOW_PROGRESS:
 827                        show_progress(t, self.max_iterations,
 828                                      f"GA is running...{t} gen from {self.max_iterations}. Kill dups!")
 829
 830                    new_pop = np.vstack((pp, pp2))
 831                    new_scores = np.concatenate((sc, pp2_scores))
 832
 833                    args_to_sort = new_scores.argsort()
 834                    return new_pop[args_to_sort], new_scores[args_to_sort]
 835
 836        # event for revolution
 837        if revolution_after_stagnation_step is None:
 838            def revolution(pop: array2D, scores: array1D, stagnation_count: int) -> Tuple[
 839                array2D,
 840                array1D
 841            ]:
 842                return pop, scores
 843        else:
 844            if revolution_oppositor is None:
 845                raise Exception(
 846                    f"How can I make revolution each {revolution_after_stagnation_step} stagnation steps "
 847                    f"if revolution_oppositor is None (not defined)?"
 848                )
 849            assert callable(revolution_oppositor)
 850
 851            from OppOpPopInit import OppositionOperators
 852            
 853            def revolution(pop: array2D, scores: array1D, stagnation_count: int) -> Tuple[
 854                array2D,
 855                array1D
 856            ]:
 857                if stagnation_count < revolution_after_stagnation_step:
 858                    return pop, scores
 859                part = int(pop.shape[0]*revolution_part)
 860                
 861                pp2 = OppositionOperators.Reflect(pop[-part:], self.revolution_oppositor)
 862                pp2_scores = self.set_function(pp2)
 863
 864                combined = np.vstack((pop, pp2))
 865                combined_scores = np.concatenate((scores, pp2_scores))
 866                args = combined_scores.argsort()
 867
 868                if SHOW_PROGRESS:
 869                    show_progress(t, self.max_iterations,
 870                                  f"GA is running...{t} gen from {self.max_iterations}. Revolution!")
 871
 872                args = args[:scores.size]
 873                return combined[args], combined_scores[args]
 874
 875        #enregion
 876
 877        #
 878        #
 879        #  START ALGORITHM LOGIC
 880        #
 881        #
 882
 883        # Report
 884        self._clear_report()  # clear old report objects
 885        self._init_report()
 886
 887        # initialization of pop
 888        
 889        if start_generation.variables is None:
 890
 891            from OppOpPopInit import init_population
 892
 893            real_pop_size = self.population_size * pop_coef
 894
 895            # pop = np.empty((real_pop_size, self.dim))
 896            scores = np.empty(real_pop_size)
 897            
 898            pop = init_population(
 899                samples_count=real_pop_size,
 900                creator=self.creator,
 901                oppositors=self.init_oppositors
 902            )
 903
 904            if self.funtimeout and self.funtimeout > 0:  # perform simulation
 905            
 906                time_counter = 0
 907
 908                for p in range(0, real_pop_size):
 909                    # simulation returns exception or func value -- check the time of evaluating
 910                    value, eval_time = self._simulate(pop[p])
 911                    scores[p] = value
 912                    time_counter += eval_time
 913
 914                if enable_printing:
 915                    print(
 916                        f"\nSim: Average time of function evaluating (secs): "
 917                        f"{time_counter/real_pop_size} (total = {time_counter})\n"
 918                    )
 919            else:
 920
 921                eval_time = time.time()
 922                scores = self.set_function(pop)
 923                eval_time = time.time() - eval_time
 924                if enable_printing:
 925                    print(
 926                        f"\nSet: Average time of function evaluating (secs): "
 927                        f"{eval_time/real_pop_size} (total = {eval_time})\n"
 928                    )
 929                
 930        else:
 931
 932            self.population_size = start_generation.variables.shape[0]
 933            self._set_elit_count(self.population_size, self.param.elit_ratio)
 934            self._set_parents_count(self.param.parents_portion)
 935
 936            pop = start_generation.variables
 937
 938            if start_generation.scores is None:
 939
 940                _time = time.time()
 941                scores = self.set_function(pop)
 942                _time = time.time() - _time
 943
 944                if enable_printing:
 945                    print(
 946                        f'\nFirst scores are made from gotten variables '
 947                        f'(by {_time} secs, about {_time/scores.size} for each creature)\n'
 948                    )
 949            else:
 950                scores = start_generation.scores
 951                if enable_printing:
 952                    print(f"\nFirst scores are from gotten population\n")
 953
 954        # Initialization by select bests and local_descent
 955        
 956        pop, scores = initializer_func(pop, scores)
 957
 958        # first sort
 959        args_to_sort = scores.argsort()
 960        pop = pop[args_to_sort]
 961        scores = scores[args_to_sort]
 962        self._update_report(scores)
 963
 964        self.population_size = scores.size
 965        self.best_function = scores[0]
 966
 967        if enable_printing:
 968            print(
 969                f"Best score before optimization: {self.best_function}"
 970            )
 971
 972        t: int = 1
 973        count_stagnation: int = 0
 974        """iterations without progress"""
 975        reason_to_stop: Optional[str] = None
 976
 977        # gets indexes of 2 parents to crossover
 978        if studEA:
 979            get_parents_inds = lambda: (0, random.randrange(1, self.parents_count))
 980        else:
 981            get_parents_inds = lambda: random_indexes_pair(self.parents_count)
 982
 983        #  while no reason to stop
 984        while True:
 985
 986            if count_stagnation > self.max_stagnations:
 987                reason_to_stop = f"limit of fails: {count_stagnation}"
 988            elif t == self.max_iterations:
 989                reason_to_stop = f'limit of iterations: {t}'
 990            elif stop_by_val(self.best_function):
 991                reason_to_stop = f"stop value reached: {self.best_function} <= {stop_when_reached}"
 992            elif time_is_done():
 993                reason_to_stop = f'time is done: {time.time() - start_time} >= {time_limit_secs}'
 994
 995            if reason_to_stop is not None:
 996                if SHOW_PROGRESS:
 997                    show_progress(t, self.max_iterations, f"GA is running... STOP! {reason_to_stop}")
 998                break
 999
1000            if scores[0] < self.best_function:  # if there is progress
1001                count_stagnation = 0
1002                self.best_function = scores[0]
1003            else:
1004                count_stagnation += 1
1005
1006            if SHOW_PROGRESS:
1007                show_progress(
1008                    t,
1009                    self.max_iterations,
1010                    f"GA is running...{t} gen from {self.max_iterations}...best value = {self.best_function}"
1011                )
1012
1013            # Select parents
1014            
1015            par: array2D = np.empty((self.parents_count, self.dim))
1016            """samples chosen to create new samples"""
1017            par_scores: array1D = np.empty(self.parents_count)
1018
1019            elit_slice = slice(None, self.elit_count)
1020            """elit parents"""
1021
1022            # copy needs because the generation will be removed after parents selection
1023            par[elit_slice] = pop[elit_slice].copy()
1024            par_scores[elit_slice] = scores[elit_slice].copy()
1025
1026            new_par_inds = (
1027                self.selection(
1028                    scores[self.elit_count:],
1029                    self.parents_count - self.elit_count
1030                ).astype(np.int16) + self.elit_count
1031            )
1032            """non-elit parents indexes"""
1033            #new_par_inds = self.selection(scores, self.parents_count - self.elit_count).astype(np.int16)
1034            par_slice = slice(self.elit_count, self.parents_count)
1035            par[par_slice] = pop[new_par_inds].copy()
1036            par_scores[par_slice] = scores[new_par_inds].copy()
1037
1038            pop = np.empty((self.population_size, self.dim))
1039            """new generation"""
1040            scores = np.empty(self.population_size)
1041            """new generation scores"""
1042
1043            parents_slice = slice(None, self.parents_count)
1044            pop[parents_slice] = par
1045            scores[parents_slice] = par_scores
1046
1047            if self.fill_children is None:  # default fill children behaviour
1048                DO_MUTATION = self.needs_mutation
1049                for k in range(self.parents_count, self.population_size, 2):
1050
1051                    r1, r2 = get_parents_inds()
1052
1053                    pvar1 = pop[r1]  # equal to par[r1], but better for cache
1054                    pvar2 = pop[r2]
1055
1056                    ch1, ch2 = self.crossover(pvar1, pvar2)
1057
1058                    if DO_MUTATION:
1059                        ch1 = self.mut(ch1)
1060                        ch2 = self.mut_middle(ch2, pvar1, pvar2)
1061
1062                    pop[k] = ch1
1063                    pop[k+1] = ch2
1064            else:  # custom behaviour
1065                self.fill_children(pop, self.parents_count)
1066
1067            if apply_function_to_parents:
1068                scores = self.set_function(pop)
1069            else:
1070                scores[self.parents_count:] = self.set_function(pop[self.parents_count:])
1071            
1072            # remove duplicates
1073            pop, scores = remover(pop, scores, t)
1074            # revolution
1075            pop, scores = revolution(pop, scores, count_stagnation)
1076
1077            # make population better
1078            args_to_sort = scores.argsort()
1079            pop = pop[args_to_sort]
1080            scores = scores[args_to_sort]
1081            self._update_report(scores)
1082
1083            # callback it
1084            total_callback(t, self.report, pop, scores)
1085            total_middle_callback()
1086
1087            t += 1
1088
1089        self.best_function = fast_min(scores[0], self.best_function)
1090
1091        last_generation = Generation(pop, scores)
1092        self.result = GAResult(last_generation)
1093
1094        if save_last_generation_as is not None:
1095            last_generation.save(save_last_generation_as)
1096
1097        if enable_printing:
1098            show = ' ' * 200
1099            sys.stdout.write(
1100                f'\r{show}\n'
1101                f'\r The best found solution:\n {pop[0]}'
1102                f'\n\n Objective function:\n {self.best_function}\n'
1103                f'\n Used generations: {len(self.report)}'
1104                f'\n Used time: {time.time() - start_time:.3g} seconds\n'
1105            )
1106            sys.stdout.flush() 
1107        
1108        if not no_plot:
1109            self.plot_results()
1110
1111        if enable_printing:
1112            if reason_to_stop is not None and 'iterations' not in reason_to_stop:
1113                sys.stdout.write(
1114                    f'\nWarning: GA is terminated because of {reason_to_stop}'
1115                )
1116
1117        return self.result
1118
1119    #endregion
1120
1121    #region PLOTTING
1122
1123    def plot_results(
1124        self,
1125        title: str = 'Genetic Algorithm',
1126        save_as: Optional[str] = None,
1127        dpi: int = 200,
1128        main_color: str = 'blue'
1129     ):
1130        """
1131        Simple plot of self.report (if not empty)
1132        """
1133        if len(self.report) == 0:
1134            sys.stdout.write("No results to plot!\n")
1135            return
1136
1137        plot_several_lines(
1138            lines=[self.report],
1139            colors=[main_color],
1140            labels=['best of generation'],
1141            linewidths=[2],
1142            title=title,
1143            xlabel='Generation',
1144            ylabel='Minimized function',
1145            save_as=save_as,
1146            dpi=dpi
1147        )
1148
1149    def plot_generation_scores(self, title: str = 'Last generation scores', save_as: Optional[str] = None):
1150        """
1151        Plots barplot of scores of last population
1152        """
1153
1154        if not hasattr(self, 'result'):
1155            raise Exception(
1156                "There is no 'result' field into ga object! Before plotting generation u need to run seaching process"
1157            )
1158
1159        plot_pop_scores(self.result.last_generation.scores, title, save_as)
1160
1161    #endregion
1162
1163    #region MUTATION
1164    def mut(self, x: array1D):
1165        """
1166        just mutation
1167        """
1168
1169        for i in self.indexes_int_mut:
1170            if random.random() < self.prob_mut_discrete:
1171                x[i] = self.discrete_mutation(x[i], *self.var_bounds[i])
1172
1173        for i in self.indexes_float_mut:                
1174            if random.random() < self.prob_mut:
1175                x[i] = self.real_mutation(x[i], *self.var_bounds[i])
1176            
1177        return x
1178
1179    def mut_middle(self, x: array1D, p1: array1D, p2: array1D):
1180        """
1181        mutation oriented on parents
1182        """
1183        for i in self.indexes_int_mut:
1184
1185            if random.random() < self.prob_mut_discrete:
1186                v1, v2 = p1[i], p2[i]
1187                if v1 < v2:
1188                    x[i] = random.randint(v1, v2)
1189                elif v1 > v2:
1190                    x[i] = random.randint(v2, v1)
1191                else:
1192                    x[i] = random.randint(*self.var_bounds[i])
1193                        
1194        for i in self.indexes_float_mut:                
1195            if random.random() < self.prob_mut:
1196                v1, v2 = p1[i], p2[i]
1197                if v1 != v2:
1198                    x[i] = random.uniform(v1, v2)
1199                else:
1200                    x[i] = random.uniform(*self.var_bounds[i])
1201        return x
1202
1203    #endregion
1204
1205    #region Set functions
1206
1207    @staticmethod
1208    def default_set_function(function_for_set: Callable[[array1D], float]):
1209        """
1210        simple function for creating set_function 
1211        function_for_set just applies to each row of population
1212        """
1213        def func(matrix: array2D):
1214            return np.array(
1215                [function_for_set(row) for row in matrix]
1216            )
1217        return func
1218
1219    @staticmethod
1220    def vectorized_set_function(function_for_set: Callable[[array1D], float]):
1221        """
1222        works like default, but faster for big populations and slower for little
1223        function_for_set just applyes to each row of population
1224        """
1225
1226        func = np.vectorize(function_for_set, signature='(n)->()')
1227        return func
1228
1229    @staticmethod
1230    def set_function_multiprocess(function_for_set: Callable[[array1D], float], n_jobs: int = -1):
1231        """
1232        like function_for_set but uses joblib with n_jobs (-1 goes to count of available processors)
1233        """
1234        try:
1235            from joblib import Parallel, delayed
1236        except ModuleNotFoundError:
1237            raise ModuleNotFoundError(
1238                "this additional feature requires joblib package," 
1239                "run `pip install joblib` or `pip install --upgrade geneticalgorithm2[full]`" 
1240                "or use another set function"
1241            )
1242
1243        def func(matrix: array2D):
1244            result = Parallel(n_jobs=n_jobs)(delayed(function_for_set)(matrix[i]) for i in range(matrix.shape[0]))
1245            return np.array(result)
1246        return func
1247            
1248    #endregion

Genetic algorithm optimization process

GeneticAlgorithm2( function: Callable[[numpy.ndarray], float] = None, dimension: int = 0, variable_type: Union[Literal['int', 'real', 'bool'], Sequence[Literal['int', 'real', 'bool']]] = 'bool', variable_boundaries: Union[numpy.ndarray, Sequence[Tuple[float, float]], NoneType] = None, variable_type_mixed=None, function_timeout: Union[float, NoneType] = None, algorithm_parameters: Union[geneticalgorithm2.data_types.algorithm_params.AlgorithmParams, Dict[str, Any]] = AlgorithmParams(max_num_iteration=None, max_iteration_without_improv=None, population_size=100, mutation_probability=0.1, mutation_discrete_probability=None, crossover_probability=None, elit_ratio=0.04, parents_portion=0.3, crossover_type='uniform', mutation_type='uniform_by_center', mutation_discrete_type='uniform_discrete', selection_type='roulette'))
 79    def __init__(
 80        self,
 81        function: FunctionToMinimize = None,
 82
 83        dimension: int = 0,
 84        variable_type: Union[VARIABLE_TYPE, Sequence[VARIABLE_TYPE]] = 'bool',
 85        variable_boundaries: Optional[Union[array2D, Sequence[Tuple[float, float]]]] = None,
 86
 87        variable_type_mixed=None,
 88
 89        function_timeout: Optional[float] = None,
 90        algorithm_parameters: Union[AlgorithmParams, Dict[str, Any]] = default_params
 91    ):
 92        """
 93        initializes the GA object and performs main checks
 94
 95        Args:
 96            function: the given objective function to be minimized -- deprecated and moved to run() method
 97            dimension: the number of decision variables, the population samples dimension
 98
 99            variable_type: string means the variable type for all variables,
100                for mixed types use sequence of strings of type for each variable
101
102            variable_boundaries: leave it None if variable_type is 'bool';
103                otherwise provide a sequence of tuples of length two as boundaries for each variable;
104                the length of the array must be equal dimension.
105                For example, ([0,100], [0,200]) determines
106                    lower boundary 0 and upper boundary 100 for first
107                    and upper boundary 200 for second variable
108                    and dimension must be 2.
109
110            variable_type_mixed -- deprecated
111
112            function_timeout: if the given function does not provide
113                output before function_timeout (unit is seconds) the algorithm raises error.
114                For example, when there is an infinite loop in the given function.
115                `None` means disabling -- deprecated and moved to run()
116
117            algorithm_parameters: AlgorithmParams object or usual dictionary with algorithm parameter;
118                it is not mandatory to provide all possible parameters
119
120        Notes:
121            - This implementation minimizes the given objective function.
122            For maximization u can multiply the function by -1 (for instance): the absolute
123                value of the output would be the actual objective function
124
125        for more details and examples of implementation please visit:
126            https://github.com/PasaOpasen/geneticalgorithm2
127  
128        """
129
130        # all default fields
131
132        # self.crossover: Callable[[np.ndarray, np.ndarray], Tuple[np.ndarray, np.ndarray]] = None
133        # self.real_mutation: Callable[[float, float, float], float] = None
134        # self.discrete_mutation: Callable[[int, int, int], int] = None
135        # self.selection: Callable[[np.ndarray, int], np.ndarray] = None
136
137        self.result = None
138        self.best_function = None
139
140        self.revolution_oppositor = None
141        self.dup_oppositor = None
142        self.creator = None
143        self.init_oppositors = None
144
145        self.f: Callable[[array1D], float] = None
146        self.funtimeout: float = None
147
148        self.set_function: Callable[[np.ndarray], np.ndarray] = None
149
150        # self.dim: int = None
151        self.var_bounds: List[Tuple[Union[int, float], Union[int, float]]] = None
152        # self.indexes_int: np.ndarray = None
153        # self.indexes_float: np.ndarray = None
154
155        self.checked_reports: List[Tuple[str, Callable[[array1D], None]]] = None
156
157        self.population_size: int = None
158        self.progress_stream = None
159
160        # input algorithm's parameters
161
162        assert isinstance(algorithm_parameters, (dict, AlgorithmParams)), (
163            "algorithm_parameters must be dict or AlgorithmParams object"
164        )
165        if not isinstance(algorithm_parameters, AlgorithmParams):
166            algorithm_parameters = AlgorithmParams.from_dict(algorithm_parameters)
167        algorithm_parameters.validate()
168        self.param = algorithm_parameters
169
170        self.crossover, self.real_mutation, self.discrete_mutation, self.selection = algorithm_parameters.get_CMS_funcs()
171
172        # dimension and area bounds
173        self.dim = int(dimension)
174        assert self.dim > 0, f"dimension count must be int and >0, gotten {dimension}"
175
176        if variable_type_mixed is not None:
177            warnings.warn(
178                f"argument variable_type_mixed is deprecated and will be removed at version 7\n "
179                f"use variable_type={tuple(variable_type_mixed)} instead"
180            )
181            variable_type = variable_type_mixed
182        self._set_types_indexes(variable_type)  # types indexes
183        self._set_var_boundaries(variable_type, variable_boundaries)  # input variables' boundaries
184
185        # fix mutation probs
186
187        assert can_be_prob(self.param.mutation_probability)
188        self.prob_mut = self.param.mutation_probability
189        assert self.param.mutation_discrete_probability is None or can_be_prob(self.param.mutation_discrete_probability)
190        self.prob_mut_discrete = self.param.mutation_discrete_probability or self.prob_mut
191
192        if self.param.crossover_probability is not None:
193            warnings.warn(
194                f"crossover_probability is deprecated and will be removed in version 7. "
195                f"Reason: it's old and has no sense"
196            )
197
198        #############################################################
199        # input function
200        if function:
201            warnings.warn(
202                f"function is deprecated in init constructor and will be removed in version 7. "
203                f"Move this argument to run() method"
204            )
205            self._check_function(function)
206        if function_timeout:
207            warnings.warn(
208                f"function_timeout is deprecated in init constructor and will be removed in version 7. "
209                f"Move this argument to run() method"
210            )
211            self._check_function_timeout(function_timeout)
212
213        #############################################################
214        
215        self.population_size = int(self.param.population_size)
216        self._set_parents_count(self.param.parents_portion)
217        self._set_elit_count(self.population_size, self.param.elit_ratio)
218        assert self.parents_count >= self.elit_count, (
219            f"\n number of parents ({self.parents_count}) "
220            f"must be greater than number of elits ({self.elit_count})"
221        )
222
223        self._set_max_iterations()
224
225        self._set_report()
226
227        # specify this function to speed up or change default behaviour
228        self.fill_children: Optional[Callable[[array2D, int], None]] = None
229        """
230        custom function which adds children for population POP 
231            where POP[:parents_count] are parents lines and next lines are for children
232        """

initializes the GA object and performs main checks

Arguments:
  • function: the given objective function to be minimized -- deprecated and moved to run() method
  • dimension: the number of decision variables, the population samples dimension
  • variable_type: string means the variable type for all variables, for mixed types use sequence of strings of type for each variable
  • variable_boundaries: leave it None if variable_type is 'bool'; otherwise provide a sequence of tuples of length two as boundaries for each variable; the length of the array must be equal dimension. For example, ([0,100], [0,200]) determines lower boundary 0 and upper boundary 100 for first and upper boundary 200 for second variable and dimension must be 2.
  • variable_type_mixed -- deprecated
  • function_timeout: if the given function does not provide output before function_timeout (unit is seconds) the algorithm raises error. For example, when there is an infinite loop in the given function. None means disabling -- deprecated and moved to run()
  • algorithm_parameters: AlgorithmParams object or usual dictionary with algorithm parameter; it is not mandatory to provide all possible parameters
Notes:
  • This implementation minimizes the given objective function. For maximization u can multiply the function by -1 (for instance): the absolute value of the output would be the actual objective function

for more details and examples of implementation please visit: https://github.com/PasaOpasen/geneticalgorithm2

default_params = AlgorithmParams(max_num_iteration=None, max_iteration_without_improv=None, population_size=100, mutation_probability=0.1, mutation_discrete_probability=None, crossover_probability=None, elit_ratio=0.04, parents_portion=0.3, crossover_type='uniform', mutation_type='uniform_by_center', mutation_discrete_type='uniform_discrete', selection_type='roulette')
PROGRESS_BAR_LEN = 20

max count of symbols in the progress bar

output_dict
64    @property
65    def output_dict(self):
66        warnings.warn(
67            "'output_dict' is deprecated and will be removed at version 7 \n"
68            "use 'result' instead"
69        )
70        return self.result
needs_mutation: bool
72    @property
73    def needs_mutation(self) -> bool:
74        """whether the mutation is required"""
75        return self.prob_mut > 0 or self.prob_mut_discrete > 0

whether the mutation is required

result
best_function
revolution_oppositor
dup_oppositor
creator
init_oppositors
f: Callable[[numpy.ndarray], float]
funtimeout: float
set_function: Callable[[numpy.ndarray], numpy.ndarray]
var_bounds: List[Tuple[Union[int, float], Union[int, float]]]
checked_reports: List[Tuple[str, Callable[[numpy.ndarray], NoneType]]]
population_size: int
progress_stream
param
dim
prob_mut
prob_mut_discrete
fill_children: Union[Callable[[numpy.ndarray, int], NoneType], NoneType]

custom function which adds children for population POP where POP[:parents_count] are parents lines and next lines are for children

def run( self, no_plot: bool = False, disable_printing: bool = False, progress_bar_stream: Union[str, NoneType] = 'stdout', disable_progress_bar: bool = False, function: Callable[[numpy.ndarray], float] = None, function_timeout: Union[float, NoneType] = None, set_function: Callable[[numpy.ndarray], numpy.ndarray] = None, apply_function_to_parents: bool = False, start_generation: Union[geneticalgorithm2.data_types.generation.Generation, str, Dict[Literal['population', 'scores'], numpy.ndarray], numpy.ndarray, Tuple[Union[numpy.ndarray, NoneType], Union[numpy.ndarray, NoneType]]] = Generation(variables=None, scores=None), studEA: bool = False, mutation_indexes: Union[Iterable[int], NoneType] = None, init_creator: Union[Callable[[], numpy.ndarray], NoneType] = None, init_oppositors: Union[Sequence[Callable[[numpy.ndarray], numpy.ndarray]], NoneType] = None, duplicates_oppositor: Union[Callable[[numpy.ndarray], numpy.ndarray], NoneType] = None, remove_duplicates_generation_step: Union[int, NoneType] = None, revolution_oppositor: Union[Callable[[numpy.ndarray], numpy.ndarray], NoneType] = None, revolution_after_stagnation_step: Union[int, NoneType] = None, revolution_part: float = 0.3, population_initializer: Tuple[int, Callable[[numpy.ndarray, numpy.ndarray], Tuple[numpy.ndarray, numpy.ndarray]]] = (1, <function get_population_initializer.<locals>.process_population>), stop_when_reached: Union[float, NoneType] = None, callbacks: Union[Sequence[Callable[[int, List[float], numpy.ndarray, numpy.ndarray], NoneType]], NoneType] = None, middle_callbacks: Union[Sequence[Callable[[geneticalgorithm2.callbacks.data.MiddleCallbackData], Tuple[geneticalgorithm2.callbacks.data.MiddleCallbackData, bool]]], NoneType] = None, time_limit_secs: Union[float, NoneType] = None, save_last_generation_as: Union[str, NoneType] = None, seed: Union[int, NoneType] = None) -> geneticalgorithm2.data_types.result.GAResult:
 448    def run(
 449        self,
 450        no_plot: bool = False,
 451        disable_printing: bool = False,
 452        progress_bar_stream: Optional[str] = 'stdout',
 453
 454        # deprecated
 455        disable_progress_bar: bool = False,
 456
 457        function: FunctionToMinimize = None,
 458        function_timeout: Optional[float] = None,
 459
 460        set_function: SetFunctionToMinimize = None,
 461        apply_function_to_parents: bool = False,
 462        start_generation: GenerationConvertible = Generation(),
 463        studEA: bool = False,
 464        mutation_indexes: Optional[Iterable[int]] = None,
 465
 466        init_creator: Optional[CreatorFunc] = None,
 467        init_oppositors: Optional[Sequence[OppositorFunc]] = None,
 468
 469        duplicates_oppositor: Optional[OppositorFunc] = None,
 470        remove_duplicates_generation_step: Optional[int] = None,
 471
 472        revolution_oppositor: Optional[OppositorFunc] = None,
 473        revolution_after_stagnation_step: Optional[int] = None,
 474        revolution_part: float = 0.3,
 475
 476        population_initializer: Tuple[
 477            int, PopulationModifier
 478        ] = get_population_initializer(select_best_of=1, local_optimization_step='never', local_optimizer=None),
 479
 480        stop_when_reached: Optional[float] = None,
 481        callbacks: Optional[Sequence[SimpleCallbackFunc]] = None,
 482        middle_callbacks: Optional[Sequence[MiddleCallbackFunc]] = None,  #+
 483        time_limit_secs: Optional[float] = None,
 484        save_last_generation_as: Optional[str] = None,
 485        seed: Optional[int] = None
 486    ) -> GAResult:
 487        """
 488        runs optimization process
 489
 490        Args:
 491            no_plot: do not plot results using matplotlib by default
 492
 493            disable_printing: do not print log info of optimization process (except progress bar)
 494
 495            progress_bar_stream: 'stdout', 'stderr' or None to disable progress bar;
 496                disabling progress bar can speed up the optimization process in many cases
 497
 498            disable_progress_bar: deprecated
 499
 500            function: the given objective function (sample -> its score) to be minimized;
 501
 502            function_timeout: if the given function does not provide
 503                output before function_timeout (unit is seconds) the algorithm raises error.
 504                For example, when there is an infinite loop in the given function.
 505                `None` means disabling
 506
 507            set_function: set function (all samples -> score per sample) to be used instead of usual function
 508                (usually for optimization purposes)
 509
 510            apply_function_to_parents: whether to apply function to parents from previous generation (if it's needed), 
 511                it can be needed at working with games agents, but for other tasks will just waste time
 512
 513            start_generation: initial generation object of any `GenerationConvertible` type
 514
 515            studEA: using stud EA strategy (crossover with best object always)
 516
 517            mutation_indexes: indexes of dimensions where mutation can be performed (all dimensions by default)
 518
 519            init_creator: the function creates population samples.
 520                By default -- random uniform for real variables and random uniform for int
 521            init_oppositors: the list of oppositors creates oppositions for base population. No by default
 522
 523            duplicates_oppositor: oppositor for applying after duplicates removing.
 524                By default -- using just random initializer from creator
 525            remove_duplicates_generation_step: step for removing duplicates (have a sense with discrete tasks).
 526                No by default
 527
 528            revolution_oppositor: oppositor for revolution time. No by default
 529            revolution_after_stagnation_step: create revolution after this generations of stagnation. No by default
 530            revolution_part: float, the part of generation to being oppose. By default is 0.3
 531
 532            population_initializer: object for actions at population initialization step
 533                to create better start population. See doc
 534
 535            stop_when_reached: stop searching after reaching this value 
 536                (it can be potential minimum or something else)
 537
 538            callbacks: sequence of callback functions with structure:
 539                (generation_number, report_list, last_population, last_scores) -> do some action
 540
 541            middle_callbacks: sequence of functions made by `MiddleCallback` class
 542
 543            time_limit_secs: limit time of working (in seconds);
 544                `None` means no time limit (limit will be only for count of generation and so on)
 545
 546            save_last_generation_as: path to .npz file 
 547                for saving last_generation as numpy dictionary like
 548                {'population': 2D-array, 'scores': 1D-array}; 
 549                None disables this option
 550
 551            seed: random seed (None if doesn't matter)
 552
 553        Returns:
 554            `GAResult` object;
 555            also fills the self.report and self.result with many report information
 556
 557        Notes:
 558            - if `function_timeout` is enabled then `function` must be set
 559            - it would be more logical to use params like `studEA` as an algorithm parameter, 
 560                but `run()`-way can be more convenient for real using
 561        """
 562
 563        if disable_progress_bar:
 564            warnings.warn(
 565                f"disable_progress_bar is deprecated and will be removed in version 7, "
 566                f"use probress_bar_stream=None to disable progress bar"
 567            )
 568            progress_bar_stream = None
 569
 570        enable_printing: bool = not disable_printing
 571
 572        start_generation = Generation.from_object(self.dim, start_generation)
 573
 574        assert is_current_gen_number(revolution_after_stagnation_step), "must be None or int and >0"
 575        assert is_current_gen_number(remove_duplicates_generation_step), "must be None or int and >0"
 576        assert can_be_prob(revolution_part), f"revolution_part must be in [0,1], not {revolution_part}"
 577        assert stop_when_reached is None or isinstance(stop_when_reached, (int, float))
 578        assert isinstance(callbacks, collections.abc.Sequence) or callbacks is None, (
 579            "callbacks should be a list of callbacks functions"
 580        )
 581        assert isinstance(middle_callbacks, collections.abc.Sequence) or middle_callbacks is None, (
 582            "middle_callbacks should be list of MiddleCallbacks functions"
 583        )
 584        assert time_limit_secs is None or time_limit_secs > 0, 'time_limit_secs must be None of number > 0'
 585
 586        self._set_mutation_indexes(mutation_indexes)
 587        from OppOpPopInit import set_seed
 588        set_seed(seed)
 589
 590        # randomstate = np.random.default_rng(random.randint(0, 1000) if seed is None else seed)
 591        # self.randomstate = randomstate
 592
 593        # using bool flag is faster than using empty function with generated string arguments
 594        SHOW_PROGRESS = progress_bar_stream is not None
 595        if SHOW_PROGRESS:
 596
 597            show_progress = lambda t, t2, s: self._progress(t, t2, status=s)
 598
 599            if progress_bar_stream == 'stdout':
 600                self.progress_stream = sys.stdout
 601            elif progress_bar_stream == 'stderr':
 602                self.progress_stream = sys.stderr
 603            else:
 604                raise Exception(
 605                    f"wrong value {progress_bar_stream} of progress_bar_stream, must be 'stdout'/'stderr'/None"
 606                )
 607        else:
 608            show_progress = None
 609
 610        stop_by_val = (
 611            (lambda best_f: False)
 612            if stop_when_reached is None
 613            else (lambda best_f: best_f <= stop_when_reached)
 614        )
 615
 616        t: int = 0
 617        count_stagnation: int = 0
 618        pop: array2D = None
 619        scores: array1D = None
 620
 621        #region CALLBACKS
 622
 623        def get_data():
 624            """
 625            returns all important data about model
 626            """
 627            return MiddleCallbackData(
 628                last_generation=Generation(pop, scores),
 629                current_generation=t,
 630                report_list=self.report,
 631
 632                mutation_prob=self.prob_mut,
 633                mutation_discrete_prob=self.prob_mut_discrete,
 634                mutation=self.real_mutation,
 635                mutation_discrete=self.discrete_mutation,
 636                crossover=self.crossover,
 637                selection=self.selection,
 638
 639                current_stagnation=count_stagnation,
 640                max_stagnation=self.max_stagnations,
 641
 642                parents_portion=self.param.parents_portion,
 643                elit_ratio=self.param.elit_ratio,
 644
 645                set_function=self.set_function,
 646
 647                reason_to_stop=reason_to_stop
 648            )
 649
 650        def set_data(data: MiddleCallbackData):
 651            """
 652            sets data to model
 653            """
 654            nonlocal pop, scores, count_stagnation, reason_to_stop
 655
 656            pop, scores = data.last_generation.variables, data.last_generation.scores
 657            self.population_size = pop.shape[0]
 658
 659            self.param.parents_portion = data.parents_portion
 660            self._set_parents_count(data.parents_portion)
 661
 662            self.param.elit_ratio = data.elit_ratio
 663            self._set_elit_count(self.population_size, data.elit_ratio)
 664
 665            self.prob_mut = data.mutation_prob
 666            self.prob_mut_discrete = data.mutation_discrete_prob
 667            self.real_mutation = data.mutation
 668            self.discrete_mutation = data.mutation_discrete
 669            self.crossover = data.crossover
 670            self.selection = data.selection
 671
 672            count_stagnation = data.current_stagnation
 673            reason_to_stop = data.reason_to_stop
 674            self.max_stagnations = data.max_stagnation
 675
 676            self.set_function = data.set_function
 677
 678        if not callbacks:
 679            total_callback = lambda g, r, lp, ls: None
 680        else:
 681            def total_callback(
 682                generation_number: int,
 683                report_list: List[float],
 684                last_population: array2D,
 685                last_scores: array1D
 686            ):
 687                for cb in callbacks:
 688                    cb(generation_number, report_list, last_population, last_scores)
 689
 690        if not middle_callbacks:
 691            total_middle_callback = lambda: None
 692        else:
 693            def total_middle_callback():
 694                """
 695                applies callbacks and sets new data if there is a sense
 696                """
 697                data = get_data()
 698                flag = False
 699                for cb in middle_callbacks:
 700                    data, has_sense = cb(data)
 701                    if has_sense:
 702                        flag = True
 703                if flag:
 704                    set_data(data)  # update global date if there was real callback step
 705
 706        #endregion
 707
 708        start_time = time.time()
 709        time_is_done = (
 710            (lambda: False)
 711            if time_limit_secs is None
 712            else (lambda: int(time.time() - start_time) >= time_limit_secs)
 713        )
 714
 715        # combine with deprecated parts
 716        function = function or self.f
 717        function_timeout = function_timeout or self.funtimeout
 718
 719        assert function or set_function, 'no function to minimize'
 720        if function:
 721            self._check_function(function)
 722        if function_timeout:
 723            self._check_function_timeout(function_timeout)
 724
 725        self.set_function = set_function or GeneticAlgorithm2.default_set_function(self.f)
 726
 727        #region Initial population, duplicates filter, revolutionary
 728
 729        pop_coef, initializer_func = population_initializer
 730        
 731        # population creator by random or with oppositions
 732        if init_creator is None:
 733
 734            from OppOpPopInit import SampleInitializers
 735
 736            # just uniform random
 737            self.creator = SampleInitializers.Combined(
 738                minimums=[v[0] for v in self.var_bounds],
 739                maximums=[v[1] for v in self.var_bounds],
 740                indexes=(
 741                    self.indexes_int,
 742                    self.indexes_float
 743                ),
 744                creator_initializers=(
 745                    SampleInitializers.RandomInteger,
 746                    SampleInitializers.Uniform
 747                )
 748            )
 749        else:
 750            assert callable(init_creator)
 751            self.creator = init_creator
 752
 753        self.init_oppositors = init_oppositors
 754        self.dup_oppositor = duplicates_oppositor
 755        self.revolution_oppositor = revolution_oppositor
 756
 757        # event for removing duplicates
 758        if remove_duplicates_generation_step is None:
 759            def remover(pop: array2D, scores: array1D, gen: int) -> Tuple[
 760                array2D,
 761                array1D
 762            ]:
 763                return pop, scores
 764        else:
 765            
 766            def without_dup(pop: array2D, scores: array1D):  # returns population without dups
 767                _, index_of_dups = np.unique(pop, axis=0, return_index=True)
 768                return pop[index_of_dups], scores[index_of_dups], scores.size - index_of_dups.size
 769            
 770            if self.dup_oppositor is None:  # if there is no dup_oppositor, use random creator
 771                def remover(pop: array2D, scores: array1D, gen: int) -> Tuple[
 772                    array2D,
 773                    array1D
 774                ]:
 775                    if gen % remove_duplicates_generation_step != 0:
 776                        return pop, scores
 777
 778                    pp, sc, count_to_create = without_dup(pop, scores)  # pop without dups
 779                    
 780                    if count_to_create == 0:
 781                        if SHOW_PROGRESS:
 782                            show_progress(t, self.max_iterations,
 783                                      f"GA is running...{t} gen from {self.max_iterations}. No dups!")
 784                        return pop, scores
 785
 786                    pp2 = SampleInitializers.CreateSamples(self.creator, count_to_create)  # new pop elements
 787                    pp2_scores = self.set_function(pp2)  # new elements values
 788
 789                    if SHOW_PROGRESS:
 790                        show_progress(t, self.max_iterations,
 791                                      f"GA is running...{t} gen from {self.max_iterations}. Kill dups!")
 792                    
 793                    new_pop = np.vstack((pp, pp2))
 794                    new_scores = np.concatenate((sc, pp2_scores))
 795
 796                    args_to_sort = new_scores.argsort()
 797                    return new_pop[args_to_sort], new_scores[args_to_sort]
 798
 799            else:  # using oppositors
 800                assert callable(self.dup_oppositor)
 801
 802                def remover(pop: np.ndarray, scores: np.ndarray, gen: int) -> Tuple[
 803                    np.ndarray,
 804                    np.ndarray
 805                ]:
 806                    if gen % remove_duplicates_generation_step != 0:
 807                        return pop, scores
 808
 809                    pp, sc, count_to_create = without_dup(pop, scores)  # pop without dups
 810
 811                    if count_to_create == 0:
 812                        if SHOW_PROGRESS:
 813                            show_progress(t, self.max_iterations,
 814                                          f"GA is running...{t} gen from {self.max_iterations}. No dups!")
 815                        return pop, scores
 816
 817                    if count_to_create > sc.size:
 818                        raise Exception(
 819                            f"Too many duplicates at generation {gen} ({count_to_create} > {sc.size}), cannot oppose"
 820                        )
 821
 822                    # oppose count_to_create worse elements
 823                    pp2 = OppositionOperators.Reflect(pp[-count_to_create:], self.dup_oppositor)  # new pop elements
 824                    pp2_scores = self.set_function(pp2)  # new elements values
 825
 826                    if SHOW_PROGRESS:
 827                        show_progress(t, self.max_iterations,
 828                                      f"GA is running...{t} gen from {self.max_iterations}. Kill dups!")
 829
 830                    new_pop = np.vstack((pp, pp2))
 831                    new_scores = np.concatenate((sc, pp2_scores))
 832
 833                    args_to_sort = new_scores.argsort()
 834                    return new_pop[args_to_sort], new_scores[args_to_sort]
 835
 836        # event for revolution
 837        if revolution_after_stagnation_step is None:
 838            def revolution(pop: array2D, scores: array1D, stagnation_count: int) -> Tuple[
 839                array2D,
 840                array1D
 841            ]:
 842                return pop, scores
 843        else:
 844            if revolution_oppositor is None:
 845                raise Exception(
 846                    f"How can I make revolution each {revolution_after_stagnation_step} stagnation steps "
 847                    f"if revolution_oppositor is None (not defined)?"
 848                )
 849            assert callable(revolution_oppositor)
 850
 851            from OppOpPopInit import OppositionOperators
 852            
 853            def revolution(pop: array2D, scores: array1D, stagnation_count: int) -> Tuple[
 854                array2D,
 855                array1D
 856            ]:
 857                if stagnation_count < revolution_after_stagnation_step:
 858                    return pop, scores
 859                part = int(pop.shape[0]*revolution_part)
 860                
 861                pp2 = OppositionOperators.Reflect(pop[-part:], self.revolution_oppositor)
 862                pp2_scores = self.set_function(pp2)
 863
 864                combined = np.vstack((pop, pp2))
 865                combined_scores = np.concatenate((scores, pp2_scores))
 866                args = combined_scores.argsort()
 867
 868                if SHOW_PROGRESS:
 869                    show_progress(t, self.max_iterations,
 870                                  f"GA is running...{t} gen from {self.max_iterations}. Revolution!")
 871
 872                args = args[:scores.size]
 873                return combined[args], combined_scores[args]
 874
 875        #enregion
 876
 877        #
 878        #
 879        #  START ALGORITHM LOGIC
 880        #
 881        #
 882
 883        # Report
 884        self._clear_report()  # clear old report objects
 885        self._init_report()
 886
 887        # initialization of pop
 888        
 889        if start_generation.variables is None:
 890
 891            from OppOpPopInit import init_population
 892
 893            real_pop_size = self.population_size * pop_coef
 894
 895            # pop = np.empty((real_pop_size, self.dim))
 896            scores = np.empty(real_pop_size)
 897            
 898            pop = init_population(
 899                samples_count=real_pop_size,
 900                creator=self.creator,
 901                oppositors=self.init_oppositors
 902            )
 903
 904            if self.funtimeout and self.funtimeout > 0:  # perform simulation
 905            
 906                time_counter = 0
 907
 908                for p in range(0, real_pop_size):
 909                    # simulation returns exception or func value -- check the time of evaluating
 910                    value, eval_time = self._simulate(pop[p])
 911                    scores[p] = value
 912                    time_counter += eval_time
 913
 914                if enable_printing:
 915                    print(
 916                        f"\nSim: Average time of function evaluating (secs): "
 917                        f"{time_counter/real_pop_size} (total = {time_counter})\n"
 918                    )
 919            else:
 920
 921                eval_time = time.time()
 922                scores = self.set_function(pop)
 923                eval_time = time.time() - eval_time
 924                if enable_printing:
 925                    print(
 926                        f"\nSet: Average time of function evaluating (secs): "
 927                        f"{eval_time/real_pop_size} (total = {eval_time})\n"
 928                    )
 929                
 930        else:
 931
 932            self.population_size = start_generation.variables.shape[0]
 933            self._set_elit_count(self.population_size, self.param.elit_ratio)
 934            self._set_parents_count(self.param.parents_portion)
 935
 936            pop = start_generation.variables
 937
 938            if start_generation.scores is None:
 939
 940                _time = time.time()
 941                scores = self.set_function(pop)
 942                _time = time.time() - _time
 943
 944                if enable_printing:
 945                    print(
 946                        f'\nFirst scores are made from gotten variables '
 947                        f'(by {_time} secs, about {_time/scores.size} for each creature)\n'
 948                    )
 949            else:
 950                scores = start_generation.scores
 951                if enable_printing:
 952                    print(f"\nFirst scores are from gotten population\n")
 953
 954        # Initialization by select bests and local_descent
 955        
 956        pop, scores = initializer_func(pop, scores)
 957
 958        # first sort
 959        args_to_sort = scores.argsort()
 960        pop = pop[args_to_sort]
 961        scores = scores[args_to_sort]
 962        self._update_report(scores)
 963
 964        self.population_size = scores.size
 965        self.best_function = scores[0]
 966
 967        if enable_printing:
 968            print(
 969                f"Best score before optimization: {self.best_function}"
 970            )
 971
 972        t: int = 1
 973        count_stagnation: int = 0
 974        """iterations without progress"""
 975        reason_to_stop: Optional[str] = None
 976
 977        # gets indexes of 2 parents to crossover
 978        if studEA:
 979            get_parents_inds = lambda: (0, random.randrange(1, self.parents_count))
 980        else:
 981            get_parents_inds = lambda: random_indexes_pair(self.parents_count)
 982
 983        #  while no reason to stop
 984        while True:
 985
 986            if count_stagnation > self.max_stagnations:
 987                reason_to_stop = f"limit of fails: {count_stagnation}"
 988            elif t == self.max_iterations:
 989                reason_to_stop = f'limit of iterations: {t}'
 990            elif stop_by_val(self.best_function):
 991                reason_to_stop = f"stop value reached: {self.best_function} <= {stop_when_reached}"
 992            elif time_is_done():
 993                reason_to_stop = f'time is done: {time.time() - start_time} >= {time_limit_secs}'
 994
 995            if reason_to_stop is not None:
 996                if SHOW_PROGRESS:
 997                    show_progress(t, self.max_iterations, f"GA is running... STOP! {reason_to_stop}")
 998                break
 999
1000            if scores[0] < self.best_function:  # if there is progress
1001                count_stagnation = 0
1002                self.best_function = scores[0]
1003            else:
1004                count_stagnation += 1
1005
1006            if SHOW_PROGRESS:
1007                show_progress(
1008                    t,
1009                    self.max_iterations,
1010                    f"GA is running...{t} gen from {self.max_iterations}...best value = {self.best_function}"
1011                )
1012
1013            # Select parents
1014            
1015            par: array2D = np.empty((self.parents_count, self.dim))
1016            """samples chosen to create new samples"""
1017            par_scores: array1D = np.empty(self.parents_count)
1018
1019            elit_slice = slice(None, self.elit_count)
1020            """elit parents"""
1021
1022            # copy needs because the generation will be removed after parents selection
1023            par[elit_slice] = pop[elit_slice].copy()
1024            par_scores[elit_slice] = scores[elit_slice].copy()
1025
1026            new_par_inds = (
1027                self.selection(
1028                    scores[self.elit_count:],
1029                    self.parents_count - self.elit_count
1030                ).astype(np.int16) + self.elit_count
1031            )
1032            """non-elit parents indexes"""
1033            #new_par_inds = self.selection(scores, self.parents_count - self.elit_count).astype(np.int16)
1034            par_slice = slice(self.elit_count, self.parents_count)
1035            par[par_slice] = pop[new_par_inds].copy()
1036            par_scores[par_slice] = scores[new_par_inds].copy()
1037
1038            pop = np.empty((self.population_size, self.dim))
1039            """new generation"""
1040            scores = np.empty(self.population_size)
1041            """new generation scores"""
1042
1043            parents_slice = slice(None, self.parents_count)
1044            pop[parents_slice] = par
1045            scores[parents_slice] = par_scores
1046
1047            if self.fill_children is None:  # default fill children behaviour
1048                DO_MUTATION = self.needs_mutation
1049                for k in range(self.parents_count, self.population_size, 2):
1050
1051                    r1, r2 = get_parents_inds()
1052
1053                    pvar1 = pop[r1]  # equal to par[r1], but better for cache
1054                    pvar2 = pop[r2]
1055
1056                    ch1, ch2 = self.crossover(pvar1, pvar2)
1057
1058                    if DO_MUTATION:
1059                        ch1 = self.mut(ch1)
1060                        ch2 = self.mut_middle(ch2, pvar1, pvar2)
1061
1062                    pop[k] = ch1
1063                    pop[k+1] = ch2
1064            else:  # custom behaviour
1065                self.fill_children(pop, self.parents_count)
1066
1067            if apply_function_to_parents:
1068                scores = self.set_function(pop)
1069            else:
1070                scores[self.parents_count:] = self.set_function(pop[self.parents_count:])
1071            
1072            # remove duplicates
1073            pop, scores = remover(pop, scores, t)
1074            # revolution
1075            pop, scores = revolution(pop, scores, count_stagnation)
1076
1077            # make population better
1078            args_to_sort = scores.argsort()
1079            pop = pop[args_to_sort]
1080            scores = scores[args_to_sort]
1081            self._update_report(scores)
1082
1083            # callback it
1084            total_callback(t, self.report, pop, scores)
1085            total_middle_callback()
1086
1087            t += 1
1088
1089        self.best_function = fast_min(scores[0], self.best_function)
1090
1091        last_generation = Generation(pop, scores)
1092        self.result = GAResult(last_generation)
1093
1094        if save_last_generation_as is not None:
1095            last_generation.save(save_last_generation_as)
1096
1097        if enable_printing:
1098            show = ' ' * 200
1099            sys.stdout.write(
1100                f'\r{show}\n'
1101                f'\r The best found solution:\n {pop[0]}'
1102                f'\n\n Objective function:\n {self.best_function}\n'
1103                f'\n Used generations: {len(self.report)}'
1104                f'\n Used time: {time.time() - start_time:.3g} seconds\n'
1105            )
1106            sys.stdout.flush() 
1107        
1108        if not no_plot:
1109            self.plot_results()
1110
1111        if enable_printing:
1112            if reason_to_stop is not None and 'iterations' not in reason_to_stop:
1113                sys.stdout.write(
1114                    f'\nWarning: GA is terminated because of {reason_to_stop}'
1115                )
1116
1117        return self.result

runs optimization process

Arguments:
  • no_plot: do not plot results using matplotlib by default
  • disable_printing: do not print log info of optimization process (except progress bar)
  • progress_bar_stream: 'stdout', 'stderr' or None to disable progress bar; disabling progress bar can speed up the optimization process in many cases
  • disable_progress_bar: deprecated
  • function: the given objective function (sample -> its score) to be minimized;
  • function_timeout: if the given function does not provide output before function_timeout (unit is seconds) the algorithm raises error. For example, when there is an infinite loop in the given function. None means disabling
  • set_function: set function (all samples -> score per sample) to be used instead of usual function (usually for optimization purposes)
  • apply_function_to_parents: whether to apply function to parents from previous generation (if it's needed), it can be needed at working with games agents, but for other tasks will just waste time
  • start_generation: initial generation object of any GenerationConvertible type
  • studEA: using stud EA strategy (crossover with best object always)
  • mutation_indexes: indexes of dimensions where mutation can be performed (all dimensions by default)
  • init_creator: the function creates population samples. By default -- random uniform for real variables and random uniform for int
  • init_oppositors: the list of oppositors creates oppositions for base population. No by default
  • duplicates_oppositor: oppositor for applying after duplicates removing. By default -- using just random initializer from creator
  • remove_duplicates_generation_step: step for removing duplicates (have a sense with discrete tasks). No by default
  • revolution_oppositor: oppositor for revolution time. No by default
  • revolution_after_stagnation_step: create revolution after this generations of stagnation. No by default
  • revolution_part: float, the part of generation to being oppose. By default is 0.3
  • population_initializer: object for actions at population initialization step to create better start population. See doc
  • stop_when_reached: stop searching after reaching this value (it can be potential minimum or something else)
  • callbacks: sequence of callback functions with structure: (generation_number, report_list, last_population, last_scores) -> do some action
  • middle_callbacks: sequence of functions made by MiddleCallback class
  • time_limit_secs: limit time of working (in seconds); None means no time limit (limit will be only for count of generation and so on)
  • save_last_generation_as: path to .npz file for saving last_generation as numpy dictionary like {'population': 2D-array, 'scores': 1D-array}; None disables this option
  • seed: random seed (None if doesn't matter)
Returns:

GAResult object; also fills the self.report and self.result with many report information

Notes:
  • if function_timeout is enabled then function must be set
  • it would be more logical to use params like studEA as an algorithm parameter, but run()-way can be more convenient for real using
def plot_results( self, title: str = 'Genetic Algorithm', save_as: Union[str, NoneType] = None, dpi: int = 200, main_color: str = 'blue'):
1123    def plot_results(
1124        self,
1125        title: str = 'Genetic Algorithm',
1126        save_as: Optional[str] = None,
1127        dpi: int = 200,
1128        main_color: str = 'blue'
1129     ):
1130        """
1131        Simple plot of self.report (if not empty)
1132        """
1133        if len(self.report) == 0:
1134            sys.stdout.write("No results to plot!\n")
1135            return
1136
1137        plot_several_lines(
1138            lines=[self.report],
1139            colors=[main_color],
1140            labels=['best of generation'],
1141            linewidths=[2],
1142            title=title,
1143            xlabel='Generation',
1144            ylabel='Minimized function',
1145            save_as=save_as,
1146            dpi=dpi
1147        )

Simple plot of self.report (if not empty)

def plot_generation_scores( self, title: str = 'Last generation scores', save_as: Union[str, NoneType] = None):
1149    def plot_generation_scores(self, title: str = 'Last generation scores', save_as: Optional[str] = None):
1150        """
1151        Plots barplot of scores of last population
1152        """
1153
1154        if not hasattr(self, 'result'):
1155            raise Exception(
1156                "There is no 'result' field into ga object! Before plotting generation u need to run seaching process"
1157            )
1158
1159        plot_pop_scores(self.result.last_generation.scores, title, save_as)

Plots barplot of scores of last population

def mut(self, x: numpy.ndarray):
1164    def mut(self, x: array1D):
1165        """
1166        just mutation
1167        """
1168
1169        for i in self.indexes_int_mut:
1170            if random.random() < self.prob_mut_discrete:
1171                x[i] = self.discrete_mutation(x[i], *self.var_bounds[i])
1172
1173        for i in self.indexes_float_mut:                
1174            if random.random() < self.prob_mut:
1175                x[i] = self.real_mutation(x[i], *self.var_bounds[i])
1176            
1177        return x

just mutation

def mut_middle(self, x: numpy.ndarray, p1: numpy.ndarray, p2: numpy.ndarray):
1179    def mut_middle(self, x: array1D, p1: array1D, p2: array1D):
1180        """
1181        mutation oriented on parents
1182        """
1183        for i in self.indexes_int_mut:
1184
1185            if random.random() < self.prob_mut_discrete:
1186                v1, v2 = p1[i], p2[i]
1187                if v1 < v2:
1188                    x[i] = random.randint(v1, v2)
1189                elif v1 > v2:
1190                    x[i] = random.randint(v2, v1)
1191                else:
1192                    x[i] = random.randint(*self.var_bounds[i])
1193                        
1194        for i in self.indexes_float_mut:                
1195            if random.random() < self.prob_mut:
1196                v1, v2 = p1[i], p2[i]
1197                if v1 != v2:
1198                    x[i] = random.uniform(v1, v2)
1199                else:
1200                    x[i] = random.uniform(*self.var_bounds[i])
1201        return x

mutation oriented on parents

@staticmethod
def default_set_function(function_for_set: Callable[[numpy.ndarray], float]):
1207    @staticmethod
1208    def default_set_function(function_for_set: Callable[[array1D], float]):
1209        """
1210        simple function for creating set_function 
1211        function_for_set just applies to each row of population
1212        """
1213        def func(matrix: array2D):
1214            return np.array(
1215                [function_for_set(row) for row in matrix]
1216            )
1217        return func

simple function for creating set_function function_for_set just applies to each row of population

@staticmethod
def vectorized_set_function(function_for_set: Callable[[numpy.ndarray], float]):
1219    @staticmethod
1220    def vectorized_set_function(function_for_set: Callable[[array1D], float]):
1221        """
1222        works like default, but faster for big populations and slower for little
1223        function_for_set just applyes to each row of population
1224        """
1225
1226        func = np.vectorize(function_for_set, signature='(n)->()')
1227        return func

works like default, but faster for big populations and slower for little function_for_set just applyes to each row of population

@staticmethod
def set_function_multiprocess(function_for_set: Callable[[numpy.ndarray], float], n_jobs: int = -1):
1229    @staticmethod
1230    def set_function_multiprocess(function_for_set: Callable[[array1D], float], n_jobs: int = -1):
1231        """
1232        like function_for_set but uses joblib with n_jobs (-1 goes to count of available processors)
1233        """
1234        try:
1235            from joblib import Parallel, delayed
1236        except ModuleNotFoundError:
1237            raise ModuleNotFoundError(
1238                "this additional feature requires joblib package," 
1239                "run `pip install joblib` or `pip install --upgrade geneticalgorithm2[full]`" 
1240                "or use another set function"
1241            )
1242
1243        def func(matrix: array2D):
1244            result = Parallel(n_jobs=n_jobs)(delayed(function_for_set)(matrix[i]) for i in range(matrix.shape[0]))
1245            return np.array(result)
1246        return func

like function_for_set but uses joblib with n_jobs (-1 goes to count of available processors)