Edit on GitHub

geneticalgorithm2.geneticalgorithm2

   1from typing import Callable, List, Tuple, Optional, Dict, Any, Union, Sequence, Literal, Iterable
   2from typing_extensions import TypeAlias
   3
   4import collections
   5import warnings
   6import operator
   7
   8import sys
   9import time
  10import random
  11import math
  12
  13import numpy as np
  14
  15from OppOpPopInit.initialiser import CreatorFunc
  16from OppOpPopInit.oppositor import OppositorFunc
  17
  18#region INTERNAL IMPORTS
  19
  20from .utils.aliases import array1D, array2D
  21
  22from .data_types.aliases import FunctionToMinimize, SetFunctionToMinimize
  23from .data_types.algorithm_params import AlgorithmParams
  24from .data_types.generation import GenerationConvertible, Generation
  25from .data_types.result import GAResult
  26
  27from .population_initializer import get_population_initializer, PopulationModifier
  28from .utils.plotting import plot_pop_scores, plot_several_lines
  29
  30from .utils.funcs import can_be_prob, is_numpy, is_current_gen_number, fast_min, random_indexes_pair
  31
  32from .callbacks.data import MiddleCallbackData
  33from .callbacks import MiddleCallbackFunc, SimpleCallbackFunc
  34
  35#endregion
  36
  37#region ALIASES
  38
  39VARIABLE_TYPE: TypeAlias = Literal['int', 'real', 'bool']
  40"""
  41the variable type for a given or all dimension, determines the values discretion:
  42    real: double numbers
  43    int: integer number only
  44    bool: in the fact is integer with bounds [0, 1]
  45"""
  46
  47#endregion
  48
  49
  50class GeneticAlgorithm2:
  51    """
  52    Genetic algorithm optimization process
  53    """
  54    
  55    default_params = AlgorithmParams()
  56    PROGRESS_BAR_LEN = 20
  57    """max count of symbols in the progress bar"""
  58
  59    @property
  60    def output_dict(self):
  61        warnings.warn(
  62            "'output_dict' is deprecated and will be removed at version 7 \n"
  63            "use 'result' instead"
  64        )
  65        return self.result
  66
  67    @property
  68    def needs_mutation(self) -> bool:
  69        """whether the mutation is required"""
  70        return self.prob_mut > 0 or self.prob_mut_discrete > 0
  71
  72    #region INIT
  73
  74    def __init__(
  75        self,
  76        function: FunctionToMinimize = None,
  77
  78        dimension: int = 0,
  79        variable_type: Union[VARIABLE_TYPE, Sequence[VARIABLE_TYPE]] = 'bool',
  80        variable_boundaries: Optional[Union[array2D, Sequence[Tuple[float, float]]]] = None,
  81
  82        variable_type_mixed=None,
  83
  84        function_timeout: Optional[float] = None,
  85        algorithm_parameters: Union[AlgorithmParams, Dict[str, Any]] = default_params
  86    ):
  87        """
  88        initializes the GA object and performs main checks
  89
  90        Args:
  91            function: the given objective function to be minimized -- deprecated and moved to run() method
  92            dimension: the number of decision variables, the population samples dimension
  93
  94            variable_type: string means the variable type for all variables,
  95                for mixed types use sequence of strings of type for each variable
  96
  97            variable_boundaries: leave it None if variable_type is 'bool';
  98                otherwise provide a sequence of tuples of length two as boundaries for each variable;
  99                the length of the array must be equal dimension.
 100                For example, ([0,100], [0,200]) determines
 101                    lower boundary 0 and upper boundary 100 for first
 102                    and upper boundary 200 for second variable
 103                    and dimension must be 2.
 104
 105            variable_type_mixed -- deprecated
 106
 107            function_timeout: if the given function does not provide
 108                output before function_timeout (unit is seconds) the algorithm raises error.
 109                For example, when there is an infinite loop in the given function.
 110                `None` means disabling -- deprecated and moved to run()
 111
 112            algorithm_parameters: AlgorithmParams object or usual dictionary with algorithm parameter;
 113                it is not mandatory to provide all possible parameters
 114
 115        Notes:
 116            - This implementation minimizes the given objective function.
 117            For maximization u can multiply the function by -1 (for instance): the absolute
 118                value of the output would be the actual objective function
 119
 120        for more details and examples of implementation please visit:
 121            https://github.com/PasaOpasen/geneticalgorithm2
 122  
 123        """
 124
 125        # all default fields
 126
 127        # self.crossover: Callable[[np.ndarray, np.ndarray], Tuple[np.ndarray, np.ndarray]] = None
 128        # self.real_mutation: Callable[[float, float, float], float] = None
 129        # self.discrete_mutation: Callable[[int, int, int], int] = None
 130        # self.selection: Callable[[np.ndarray, int], np.ndarray] = None
 131
 132        self.result = None
 133        self.best_function = None
 134
 135        self.revolution_oppositor = None
 136        self.dup_oppositor = None
 137        self.creator = None
 138        self.init_oppositors = None
 139
 140        self.f: Callable[[array1D], float] = None
 141        self.funtimeout: float = None
 142
 143        self.set_function: Callable[[np.ndarray], np.ndarray] = None
 144
 145        # self.dim: int = None
 146        self.var_bounds: List[Tuple[Union[int, float], Union[int, float]]] = None
 147        # self.indexes_int: np.ndarray = None
 148        # self.indexes_float: np.ndarray = None
 149
 150        self.checked_reports: List[Tuple[str, Callable[[array1D], None]]] = None
 151
 152        self.population_size: int = None
 153        self.progress_stream = None
 154
 155        # input algorithm's parameters
 156
 157        assert isinstance(algorithm_parameters, (dict, AlgorithmParams)), (
 158            "algorithm_parameters must be dict or AlgorithmParams object"
 159        )
 160        if not isinstance(algorithm_parameters, AlgorithmParams):
 161            algorithm_parameters = AlgorithmParams.from_dict(algorithm_parameters)
 162        algorithm_parameters.validate()
 163        self.param = algorithm_parameters
 164
 165        self.crossover, self.real_mutation, self.discrete_mutation, self.selection = algorithm_parameters.get_CMS_funcs()
 166
 167        # dimension and area bounds
 168        self.dim = int(dimension)
 169        assert self.dim > 0, f"dimension count must be int and >0, gotten {dimension}"
 170
 171        if variable_type_mixed is not None:
 172            warnings.warn(
 173                f"argument variable_type_mixed is deprecated and will be removed at version 7\n "
 174                f"use variable_type={tuple(variable_type_mixed)} instead"
 175            )
 176            variable_type = variable_type_mixed
 177        self._set_types_indexes(variable_type)  # types indexes
 178        self._set_var_boundaries(variable_type, variable_boundaries)  # input variables' boundaries
 179
 180        # fix mutation probs
 181
 182        assert can_be_prob(self.param.mutation_probability)
 183        self.prob_mut = self.param.mutation_probability
 184        assert self.param.mutation_discrete_probability is None or can_be_prob(self.param.mutation_discrete_probability)
 185        self.prob_mut_discrete = self.param.mutation_discrete_probability or self.prob_mut
 186
 187        if self.param.crossover_probability is not None:
 188            warnings.warn(
 189                f"crossover_probability is deprecated and will be removed in version 7. "
 190                f"Reason: it's old and has no sense"
 191            )
 192
 193        #############################################################
 194        # input function
 195        if function:
 196            warnings.warn(
 197                f"function is deprecated in init constructor and will be removed in version 7. "
 198                f"Move this argument to run() method"
 199            )
 200            self._check_function(function)
 201        if function_timeout:
 202            warnings.warn(
 203                f"function_timeout is deprecated in init constructor and will be removed in version 7. "
 204                f"Move this argument to run() method"
 205            )
 206            self._check_function_timeout(function_timeout)
 207
 208        #############################################################
 209        
 210        self.population_size = int(self.param.population_size)
 211        self._set_parents_count(self.param.parents_portion)
 212        self._set_elit_count(self.population_size, self.param.elit_ratio)
 213        assert self.parents_count >= self.elit_count, (
 214            f"\n number of parents ({self.parents_count}) "
 215            f"must be greater than number of elits ({self.elit_count})"
 216        )
 217
 218        self._set_max_iterations()
 219
 220        self._set_report()
 221
 222        # specify this function to speed up or change default behaviour
 223        self.fill_children: Optional[Callable[[array2D, int], None]] = None
 224        """
 225        custom function which adds children for population POP 
 226            where POP[:parents_count] are parents lines and next lines are for children
 227        """
 228
 229    def _set_types_indexes(self, variable_type: Union[str, Sequence[str]]):
 230
 231        indexes = np.arange(self.dim)
 232        self.indexes_int = np.array([])
 233        self.indexes_float = np.array([])
 234
 235        assert_message = (
 236            f"\n variable_type must be 'bool', 'int', 'real' or a sequence with 'int' and 'real', got {variable_type}"
 237        )
 238
 239        if isinstance(variable_type, str):
 240            assert (variable_type in VARIABLE_TYPE.__args__), assert_message
 241            if variable_type == 'real':
 242                self.indexes_float = indexes
 243            else:
 244                self.indexes_int = indexes
 245
 246        else:  # sequence case
 247
 248            assert len(variable_type) == self.dim, (
 249                f"\n variable_type must have a length equal dimension. "
 250                f"Should be {self.dim}, got {len(variable_type)}"
 251            )
 252            assert 'bool' not in variable_type, (
 253                "don't use 'bool' if variable_type is a sequence, "
 254                "for 'boolean' case use 'int' and specify boundary as (0,1)"
 255            )
 256            assert all(v in VARIABLE_TYPE.__args__ for v in variable_type), assert_message
 257
 258            vartypes = np.array(variable_type)
 259            self.indexes_int = indexes[vartypes == 'int']
 260            self.indexes_float = indexes[vartypes == 'real']
 261
 262    def _set_var_boundaries(
 263        self,
 264        variable_type: Union[str, Sequence[str]],
 265        variable_boundaries
 266    ):
 267        if isinstance(variable_type, str) and variable_type == 'bool':
 268            self.var_bounds = [(0, 1)] * self.dim
 269        else:
 270
 271            if is_numpy(variable_boundaries):
 272                assert variable_boundaries.shape == (self.dim, 2), (
 273                    f"\n if variable_boundaries is numpy array, it must be with shape (dim, 2)"
 274                )
 275            else:
 276                assert len(variable_boundaries) == self.dim and all((len(t) == 2 for t in variable_boundaries)), (
 277                    "\n if variable_boundaries is sequence, "
 278                    "it must be with len dim and boundary for each variable must be a tuple of length two"
 279                )
 280
 281            for i in variable_boundaries:
 282                assert i[0] <= i[1], "\n lower_boundaries must be smaller than upper_boundaries [lower,upper]"
 283
 284            self.var_bounds = [(i[0], i[1]) for i in variable_boundaries]
 285
 286    def _set_parents_count(self, parents_portion: float):
 287
 288        self.parents_count = int(parents_portion * self.population_size)
 289        assert self.population_size >= self.parents_count > 1, (
 290            f'parents count {self.parents_count} cannot be less than population size {self.population_size}'
 291        )
 292        trl = self.population_size - self.parents_count
 293        if trl % 2 != 0:
 294            self.parents_count += 1
 295
 296    def _set_elit_count(self, pop_size: int, elit_ratio: Union[float, int]):
 297
 298        assert elit_ratio >= 0
 299        self.elit_count = elit_ratio if isinstance(elit_ratio, str) else math.ceil(pop_size*elit_ratio)
 300
 301    def _set_max_iterations(self):
 302
 303        if self.param.max_num_iteration is None:
 304            iterate = 0
 305            for i in range(0, self.dim):
 306                bound_min, bound_max = self.var_bounds[i]
 307                var_space = bound_max - bound_min
 308                if i in self.indexes_int:
 309                    iterate += var_space * self.dim * (100 / self.population_size)
 310                else:
 311                    iterate += var_space * 50 * (100 / self.population_size)
 312            iterate = int(iterate)
 313            if (iterate * self.population_size) > 10000000:
 314                iterate = 10000000 / self.population_size
 315
 316            self.max_iterations = fast_min(iterate, 8000)
 317        else:
 318            assert self.param.max_num_iteration > 0
 319            self.max_iterations = math.ceil(self.param.max_num_iteration)
 320
 321        max_it = self.param.max_iteration_without_improv
 322        if max_it is None:
 323            self.max_stagnations = self.max_iterations + 1
 324        else:
 325            self.max_stagnations = math.ceil(max_it)
 326
 327    def _check_function(self, function: Callable[[array1D], float]):
 328        assert callable(function), "function must be callable!"
 329        self.f = function
 330
 331    def _check_function_timeout(self, function_timeout: Optional[float]):
 332
 333        if function_timeout is not None and function_timeout > 0:
 334            try:
 335                from func_timeout import func_timeout, FunctionTimedOut
 336            except ModuleNotFoundError:
 337                raise ModuleNotFoundError(
 338                    "function_timeout > 0 needs additional package func_timeout\n"
 339                    "run `python -m pip install func_timeout`\n"
 340                    "or disable this parameter: function_timeout=None"
 341                )
 342
 343        self.funtimeout = None if function_timeout is None else float(function_timeout)
 344
 345    #endregion
 346
 347    #region REPORT
 348
 349    def _set_report(self):
 350        """
 351        creates default report checker
 352        """
 353        self.checked_reports = [
 354            # item 0 cuz scores will be sorted and min item is items[0]
 355            ('report', operator.itemgetter(0))
 356        ]
 357
 358    def _clear_report(self):
 359        """
 360        removes all report objects
 361        """
 362        fields = [f for f in vars(self).keys() if f.startswith('report')]
 363        for attr in fields:
 364            delattr(self, attr)
 365
 366    def _init_report(self):
 367        """
 368        makes empty report fields
 369        """
 370        for name, _ in self.checked_reports:
 371            setattr(self, name, [])
 372
 373    def _update_report(self, scores: array1D):
 374        """
 375        append report value to the end of field
 376        """
 377        for name, func in self.checked_reports:
 378            getattr(self, name).append(
 379                func(scores)
 380            )
 381
 382    #endregion
 383
 384    #region RUN METHODS
 385
 386    def _progress(self, count: int, total: int, status: str = ''):
 387
 388        part = count / total
 389
 390        filled_len = round(GeneticAlgorithm2.PROGRESS_BAR_LEN * part)
 391        percents = round(100.0 * part, 1)
 392        bar = '|' * filled_len + '_' * (GeneticAlgorithm2.PROGRESS_BAR_LEN - filled_len)
 393
 394        self.progress_stream.write('\r%s %s%s %s' % (bar, percents, '%', status))
 395        self.progress_stream.flush()
 396
 397    def __str__(self):
 398        return f"Genetic algorithm object with parameters {self.param}"
 399
 400    def __repr__(self):
 401        return self.__str__()
 402
 403    def _simulate(self, sample: array1D):
 404
 405        from func_timeout import func_timeout, FunctionTimedOut
 406
 407        obj = None
 408        eval_time = time.time()
 409        try:
 410            obj = func_timeout(
 411                self.funtimeout,
 412                lambda: self.f(sample)
 413            )
 414        except FunctionTimedOut:
 415            print("given function is not applicable")
 416        eval_time = time.time() - eval_time
 417
 418        assert obj is not None, (
 419            f"the given function was running like {eval_time} seconds and does not provide any output"
 420        )
 421
 422        tp = type(obj)
 423        assert (
 424            tp in (int, float) or np.issubdtype(tp, np.floating) or np.issubdtype(tp, np.integer)
 425        ), f"Minimized function should return a number, but got '{obj}' object with type {tp}"
 426
 427        return obj, eval_time
 428
 429    def _set_mutation_indexes(self, mutation_indexes: Optional[Iterable[int]]):
 430
 431        if mutation_indexes is None:
 432            self.indexes_float_mut = self.indexes_float
 433            self.indexes_int_mut = self.indexes_int
 434        else:
 435            tmp_indexes = set(mutation_indexes)
 436            self.indexes_int_mut = np.array(list(tmp_indexes.intersection(self.indexes_int)))
 437            self.indexes_float_mut = np.array(list(tmp_indexes.intersection(self.indexes_float)))
 438
 439            if self.indexes_float_mut.size == 0 and self.indexes_int_mut.size == 0:
 440                warnings.warn(f"No mutation dimensions!!! Check ur mutation indexes!!")
 441
 442    #@profile
 443    def run(
 444        self,
 445        no_plot: bool = False,
 446        disable_printing: bool = False,
 447        progress_bar_stream: Optional[str] = 'stdout',
 448
 449        # deprecated
 450        disable_progress_bar: bool = False,
 451
 452        function: FunctionToMinimize = None,
 453        function_timeout: Optional[float] = None,
 454
 455        set_function: SetFunctionToMinimize = None,
 456        apply_function_to_parents: bool = False,
 457        start_generation: GenerationConvertible = Generation(),
 458        studEA: bool = False,
 459        mutation_indexes: Optional[Iterable[int]] = None,
 460
 461        init_creator: Optional[CreatorFunc] = None,
 462        init_oppositors: Optional[Sequence[OppositorFunc]] = None,
 463
 464        duplicates_oppositor: Optional[OppositorFunc] = None,
 465        remove_duplicates_generation_step: Optional[int] = None,
 466
 467        revolution_oppositor: Optional[OppositorFunc] = None,
 468        revolution_after_stagnation_step: Optional[int] = None,
 469        revolution_part: float = 0.3,
 470
 471        population_initializer: Tuple[
 472            int, PopulationModifier
 473        ] = get_population_initializer(select_best_of=1, local_optimization_step='never', local_optimizer=None),
 474
 475        stop_when_reached: Optional[float] = None,
 476        callbacks: Optional[Sequence[SimpleCallbackFunc]] = None,
 477        middle_callbacks: Optional[Sequence[MiddleCallbackFunc]] = None,  #+
 478        time_limit_secs: Optional[float] = None,
 479        save_last_generation_as: Optional[str] = None,
 480        seed: Optional[int] = None
 481    ) -> GAResult:
 482        """
 483        runs optimization process
 484
 485        Args:
 486            no_plot: do not plot results using matplotlib by default
 487
 488            disable_printing: do not print log info of optimization process (except progress bar)
 489
 490            progress_bar_stream: 'stdout', 'stderr' or None to disable progress bar;
 491                disabling progress bar can speed up the optimization process in many cases
 492
 493            disable_progress_bar: deprecated
 494
 495            function: the given objective function (sample -> its score) to be minimized;
 496
 497            function_timeout: if the given function does not provide
 498                output before function_timeout (unit is seconds) the algorithm raises error.
 499                For example, when there is an infinite loop in the given function.
 500                `None` means disabling
 501
 502            set_function: set function (all samples -> score per sample) to be used instead of usual function
 503                (usually for optimization purposes)
 504
 505            apply_function_to_parents: whether to apply function to parents from previous generation (if it's needed), 
 506                it can be needed at working with games agents, but for other tasks will just waste time
 507
 508            start_generation: initial generation object of any `GenerationConvertible` type
 509
 510            studEA: using stud EA strategy (crossover with best object always)
 511
 512            mutation_indexes: indexes of dimensions where mutation can be performed (all dimensions by default)
 513
 514            init_creator: the function creates population samples.
 515                By default -- random uniform for real variables and random uniform for int
 516            init_oppositors: the list of oppositors creates oppositions for base population. No by default
 517
 518            duplicates_oppositor: oppositor for applying after duplicates removing.
 519                By default -- using just random initializer from creator
 520            remove_duplicates_generation_step: step for removing duplicates (have a sense with discrete tasks).
 521                No by default
 522
 523            revolution_oppositor: oppositor for revolution time. No by default
 524            revolution_after_stagnation_step: create revolution after this generations of stagnation. No by default
 525            revolution_part: float, the part of generation to being oppose. By default is 0.3
 526
 527            population_initializer: object for actions at population initialization step
 528                to create better start population. See doc
 529
 530            stop_when_reached: stop searching after reaching this value 
 531                (it can be potential minimum or something else)
 532
 533            callbacks: sequence of callback functions with structure:
 534                (generation_number, report_list, last_population, last_scores) -> do some action
 535
 536            middle_callbacks: sequence of functions made by `MiddleCallback` class
 537
 538            time_limit_secs: limit time of working (in seconds);
 539                `None` means no time limit (limit will be only for count of generation and so on)
 540
 541            save_last_generation_as: path to .npz file 
 542                for saving last_generation as numpy dictionary like
 543                {'population': 2D-array, 'scores': 1D-array}; 
 544                None disables this option
 545
 546            seed: random seed (None if doesn't matter)
 547
 548        Returns:
 549            `GAResult` object;
 550            also fills the self.report and self.result with many report information
 551
 552        Notes:
 553            - if `function_timeout` is enabled then `function` must be set
 554            - it would be more logical to use params like `studEA` as an algorithm parameter, 
 555                but `run()`-way can be more convenient for real using
 556        """
 557
 558        if disable_progress_bar:
 559            warnings.warn(
 560                f"disable_progress_bar is deprecated and will be removed in version 7, "
 561                f"use probress_bar_stream=None to disable progress bar"
 562            )
 563            progress_bar_stream = None
 564
 565        enable_printing: bool = not disable_printing
 566
 567        start_generation = Generation.from_object(self.dim, start_generation)
 568
 569        assert is_current_gen_number(revolution_after_stagnation_step), "must be None or int and >0"
 570        assert is_current_gen_number(remove_duplicates_generation_step), "must be None or int and >0"
 571        assert can_be_prob(revolution_part), f"revolution_part must be in [0,1], not {revolution_part}"
 572        assert stop_when_reached is None or isinstance(stop_when_reached, (int, float))
 573        assert isinstance(callbacks, collections.abc.Sequence) or callbacks is None, (
 574            "callbacks should be a list of callbacks functions"
 575        )
 576        assert isinstance(middle_callbacks, collections.abc.Sequence) or middle_callbacks is None, (
 577            "middle_callbacks should be list of MiddleCallbacks functions"
 578        )
 579        assert time_limit_secs is None or time_limit_secs > 0, 'time_limit_secs must be None of number > 0'
 580
 581        self._set_mutation_indexes(mutation_indexes)
 582        from OppOpPopInit import set_seed
 583        set_seed(seed)
 584
 585        # randomstate = np.random.default_rng(random.randint(0, 1000) if seed is None else seed)
 586        # self.randomstate = randomstate
 587
 588        # using bool flag is faster than using empty function with generated string arguments
 589        SHOW_PROGRESS = progress_bar_stream is not None
 590        if SHOW_PROGRESS:
 591
 592            show_progress = lambda t, t2, s: self._progress(t, t2, status=s)
 593
 594            if progress_bar_stream == 'stdout':
 595                self.progress_stream = sys.stdout
 596            elif progress_bar_stream == 'stderr':
 597                self.progress_stream = sys.stderr
 598            else:
 599                raise Exception(
 600                    f"wrong value {progress_bar_stream} of progress_bar_stream, must be 'stdout'/'stderr'/None"
 601                )
 602        else:
 603            show_progress = None
 604
 605        stop_by_val = (
 606            (lambda best_f: False)
 607            if stop_when_reached is None
 608            else (lambda best_f: best_f <= stop_when_reached)
 609        )
 610
 611        t: int = 0
 612        count_stagnation: int = 0
 613        pop: array2D = None
 614        scores: array1D = None
 615
 616        #region CALLBACKS
 617
 618        def get_data():
 619            """
 620            returns all important data about model
 621            """
 622            return MiddleCallbackData(
 623                last_generation=Generation(pop, scores),
 624                current_generation=t,
 625                report_list=self.report,
 626
 627                mutation_prob=self.prob_mut,
 628                mutation_discrete_prob=self.prob_mut_discrete,
 629                mutation=self.real_mutation,
 630                mutation_discrete=self.discrete_mutation,
 631                crossover=self.crossover,
 632                selection=self.selection,
 633
 634                current_stagnation=count_stagnation,
 635                max_stagnation=self.max_stagnations,
 636
 637                parents_portion=self.param.parents_portion,
 638                elit_ratio=self.param.elit_ratio,
 639
 640                set_function=self.set_function,
 641
 642                reason_to_stop=reason_to_stop
 643            )
 644
 645        def set_data(data: MiddleCallbackData):
 646            """
 647            sets data to model
 648            """
 649            nonlocal pop, scores, count_stagnation, reason_to_stop
 650
 651            pop, scores = data.last_generation.variables, data.last_generation.scores
 652            self.population_size = pop.shape[0]
 653
 654            self.param.parents_portion = data.parents_portion
 655            self._set_parents_count(data.parents_portion)
 656
 657            self.param.elit_ratio = data.elit_ratio
 658            self._set_elit_count(self.population_size, data.elit_ratio)
 659
 660            self.prob_mut = data.mutation_prob
 661            self.prob_mut_discrete = data.mutation_discrete_prob
 662            self.real_mutation = data.mutation
 663            self.discrete_mutation = data.mutation_discrete
 664            self.crossover = data.crossover
 665            self.selection = data.selection
 666
 667            count_stagnation = data.current_stagnation
 668            reason_to_stop = data.reason_to_stop
 669            self.max_stagnations = data.max_stagnation
 670
 671            self.set_function = data.set_function
 672
 673        if not callbacks:
 674            total_callback = lambda g, r, lp, ls: None
 675        else:
 676            def total_callback(
 677                generation_number: int,
 678                report_list: List[float],
 679                last_population: array2D,
 680                last_scores: array1D
 681            ):
 682                for cb in callbacks:
 683                    cb(generation_number, report_list, last_population, last_scores)
 684
 685        if not middle_callbacks:
 686            total_middle_callback = lambda: None
 687        else:
 688            def total_middle_callback():
 689                """
 690                applies callbacks and sets new data if there is a sense
 691                """
 692                data = get_data()
 693                flag = False
 694                for cb in middle_callbacks:
 695                    data, has_sense = cb(data)
 696                    if has_sense:
 697                        flag = True
 698                if flag:
 699                    set_data(data)  # update global date if there was real callback step
 700
 701        #endregion
 702
 703        start_time = time.time()
 704        time_is_done = (
 705            (lambda: False)
 706            if time_limit_secs is None
 707            else (lambda: int(time.time() - start_time) >= time_limit_secs)
 708        )
 709
 710        # combine with deprecated parts
 711        function = function or self.f
 712        function_timeout = function_timeout or self.funtimeout
 713
 714        assert function or set_function, 'no function to minimize'
 715        if function:
 716            self._check_function(function)
 717        if function_timeout:
 718            self._check_function_timeout(function_timeout)
 719
 720        self.set_function = set_function or GeneticAlgorithm2.default_set_function(self.f)
 721
 722        #region Initial population, duplicates filter, revolutionary
 723
 724        pop_coef, initializer_func = population_initializer
 725        
 726        # population creator by random or with oppositions
 727        if init_creator is None:
 728
 729            from OppOpPopInit import SampleInitializers
 730
 731            # just uniform random
 732            self.creator = SampleInitializers.Combined(
 733                minimums=[v[0] for v in self.var_bounds],
 734                maximums=[v[1] for v in self.var_bounds],
 735                indexes=(
 736                    self.indexes_int,
 737                    self.indexes_float
 738                ),
 739                creator_initializers=(
 740                    SampleInitializers.RandomInteger,
 741                    SampleInitializers.Uniform
 742                )
 743            )
 744        else:
 745            assert callable(init_creator)
 746            self.creator = init_creator
 747
 748        self.init_oppositors = init_oppositors
 749        self.dup_oppositor = duplicates_oppositor
 750        self.revolution_oppositor = revolution_oppositor
 751
 752        # event for removing duplicates
 753        if remove_duplicates_generation_step is None:
 754            def remover(pop: array2D, scores: array1D, gen: int) -> Tuple[
 755                array2D,
 756                array1D
 757            ]:
 758                return pop, scores
 759        else:
 760            
 761            def without_dup(pop: array2D, scores: array1D):  # returns population without dups
 762                _, index_of_dups = np.unique(pop, axis=0, return_index=True)
 763                return pop[index_of_dups], scores[index_of_dups], scores.size - index_of_dups.size
 764            
 765            if self.dup_oppositor is None:  # if there is no dup_oppositor, use random creator
 766                def remover(pop: array2D, scores: array1D, gen: int) -> Tuple[
 767                    array2D,
 768                    array1D
 769                ]:
 770                    if gen % remove_duplicates_generation_step != 0:
 771                        return pop, scores
 772
 773                    pp, sc, count_to_create = without_dup(pop, scores)  # pop without dups
 774                    
 775                    if count_to_create == 0:
 776                        if SHOW_PROGRESS:
 777                            show_progress(t, self.max_iterations,
 778                                      f"GA is running...{t} gen from {self.max_iterations}. No dups!")
 779                        return pop, scores
 780
 781                    pp2 = SampleInitializers.CreateSamples(self.creator, count_to_create)  # new pop elements
 782                    pp2_scores = self.set_function(pp2)  # new elements values
 783
 784                    if SHOW_PROGRESS:
 785                        show_progress(t, self.max_iterations,
 786                                      f"GA is running...{t} gen from {self.max_iterations}. Kill dups!")
 787                    
 788                    new_pop = np.vstack((pp, pp2))
 789                    new_scores = np.concatenate((sc, pp2_scores))
 790
 791                    args_to_sort = new_scores.argsort()
 792                    return new_pop[args_to_sort], new_scores[args_to_sort]
 793
 794            else:  # using oppositors
 795                assert callable(self.dup_oppositor)
 796
 797                def remover(pop: np.ndarray, scores: np.ndarray, gen: int) -> Tuple[
 798                    np.ndarray,
 799                    np.ndarray
 800                ]:
 801                    if gen % remove_duplicates_generation_step != 0:
 802                        return pop, scores
 803
 804                    pp, sc, count_to_create = without_dup(pop, scores)  # pop without dups
 805
 806                    if count_to_create == 0:
 807                        if SHOW_PROGRESS:
 808                            show_progress(t, self.max_iterations,
 809                                          f"GA is running...{t} gen from {self.max_iterations}. No dups!")
 810                        return pop, scores
 811
 812                    if count_to_create > sc.size:
 813                        raise Exception(
 814                            f"Too many duplicates at generation {gen} ({count_to_create} > {sc.size}), cannot oppose"
 815                        )
 816
 817                    # oppose count_to_create worse elements
 818                    pp2 = OppositionOperators.Reflect(pp[-count_to_create:], self.dup_oppositor)  # new pop elements
 819                    pp2_scores = self.set_function(pp2)  # new elements values
 820
 821                    if SHOW_PROGRESS:
 822                        show_progress(t, self.max_iterations,
 823                                      f"GA is running...{t} gen from {self.max_iterations}. Kill dups!")
 824
 825                    new_pop = np.vstack((pp, pp2))
 826                    new_scores = np.concatenate((sc, pp2_scores))
 827
 828                    args_to_sort = new_scores.argsort()
 829                    return new_pop[args_to_sort], new_scores[args_to_sort]
 830
 831        # event for revolution
 832        if revolution_after_stagnation_step is None:
 833            def revolution(pop: array2D, scores: array1D, stagnation_count: int) -> Tuple[
 834                array2D,
 835                array1D
 836            ]:
 837                return pop, scores
 838        else:
 839            if revolution_oppositor is None:
 840                raise Exception(
 841                    f"How can I make revolution each {revolution_after_stagnation_step} stagnation steps "
 842                    f"if revolution_oppositor is None (not defined)?"
 843                )
 844            assert callable(revolution_oppositor)
 845
 846            from OppOpPopInit import OppositionOperators
 847            
 848            def revolution(pop: array2D, scores: array1D, stagnation_count: int) -> Tuple[
 849                array2D,
 850                array1D
 851            ]:
 852                if stagnation_count < revolution_after_stagnation_step:
 853                    return pop, scores
 854                part = int(pop.shape[0]*revolution_part)
 855                
 856                pp2 = OppositionOperators.Reflect(pop[-part:], self.revolution_oppositor)
 857                pp2_scores = self.set_function(pp2)
 858
 859                combined = np.vstack((pop, pp2))
 860                combined_scores = np.concatenate((scores, pp2_scores))
 861                args = combined_scores.argsort()
 862
 863                if SHOW_PROGRESS:
 864                    show_progress(t, self.max_iterations,
 865                                  f"GA is running...{t} gen from {self.max_iterations}. Revolution!")
 866
 867                args = args[:scores.size]
 868                return combined[args], combined_scores[args]
 869
 870        #enregion
 871
 872        #
 873        #
 874        #  START ALGORITHM LOGIC
 875        #
 876        #
 877
 878        # Report
 879        self._clear_report()  # clear old report objects
 880        self._init_report()
 881
 882        # initialization of pop
 883        
 884        if start_generation.variables is None:
 885
 886            from OppOpPopInit import init_population
 887
 888            real_pop_size = self.population_size * pop_coef
 889
 890            # pop = np.empty((real_pop_size, self.dim))
 891            scores = np.empty(real_pop_size)
 892            
 893            pop = init_population(
 894                samples_count=real_pop_size,
 895                creator=self.creator,
 896                oppositors=self.init_oppositors
 897            )
 898
 899            if self.funtimeout and self.funtimeout > 0:  # perform simulation
 900            
 901                time_counter = 0
 902
 903                for p in range(0, real_pop_size):
 904                    # simulation returns exception or func value -- check the time of evaluating
 905                    value, eval_time = self._simulate(pop[p])
 906                    scores[p] = value
 907                    time_counter += eval_time
 908
 909                if enable_printing:
 910                    print(
 911                        f"\nSim: Average time of function evaluating (secs): "
 912                        f"{time_counter/real_pop_size} (total = {time_counter})\n"
 913                    )
 914            else:
 915
 916                eval_time = time.time()
 917                scores = self.set_function(pop)
 918                eval_time = time.time() - eval_time
 919                if enable_printing:
 920                    print(
 921                        f"\nSet: Average time of function evaluating (secs): "
 922                        f"{eval_time/real_pop_size} (total = {eval_time})\n"
 923                    )
 924                
 925        else:
 926
 927            self.population_size = start_generation.variables.shape[0]
 928            self._set_elit_count(self.population_size, self.param.elit_ratio)
 929            self._set_parents_count(self.param.parents_portion)
 930
 931            pop = start_generation.variables
 932
 933            if start_generation.scores is None:
 934
 935                _time = time.time()
 936                scores = self.set_function(pop)
 937                _time = time.time() - _time
 938
 939                if enable_printing:
 940                    print(
 941                        f'\nFirst scores are made from gotten variables '
 942                        f'(by {_time} secs, about {_time/scores.size} for each creature)\n'
 943                    )
 944            else:
 945                scores = start_generation.scores
 946                if enable_printing:
 947                    print(f"\nFirst scores are from gotten population\n")
 948
 949        # Initialization by select bests and local_descent
 950        
 951        pop, scores = initializer_func(pop, scores)
 952
 953        # first sort
 954        args_to_sort = scores.argsort()
 955        pop = pop[args_to_sort]
 956        scores = scores[args_to_sort]
 957        self._update_report(scores)
 958
 959        self.population_size = scores.size
 960        self.best_function = scores[0]
 961
 962        if enable_printing:
 963            print(
 964                f"Best score before optimization: {self.best_function}"
 965            )
 966
 967        t: int = 1
 968        count_stagnation: int = 0
 969        """iterations without progress"""
 970        reason_to_stop: Optional[str] = None
 971
 972        # gets indexes of 2 parents to crossover
 973        if studEA:
 974            get_parents_inds = lambda: (0, random.randrange(1, self.parents_count))
 975        else:
 976            get_parents_inds = lambda: random_indexes_pair(self.parents_count)
 977
 978        #  while no reason to stop
 979        while True:
 980
 981            if count_stagnation > self.max_stagnations:
 982                reason_to_stop = f"limit of fails: {count_stagnation}"
 983            elif t == self.max_iterations:
 984                reason_to_stop = f'limit of iterations: {t}'
 985            elif stop_by_val(self.best_function):
 986                reason_to_stop = f"stop value reached: {self.best_function} <= {stop_when_reached}"
 987            elif time_is_done():
 988                reason_to_stop = f'time is done: {time.time() - start_time} >= {time_limit_secs}'
 989
 990            if reason_to_stop is not None:
 991                if SHOW_PROGRESS:
 992                    show_progress(t, self.max_iterations, f"GA is running... STOP! {reason_to_stop}")
 993                break
 994
 995            if scores[0] < self.best_function:  # if there is progress
 996                count_stagnation = 0
 997                self.best_function = scores[0]
 998            else:
 999                count_stagnation += 1
1000
1001            if SHOW_PROGRESS:
1002                show_progress(
1003                    t,
1004                    self.max_iterations,
1005                    f"GA is running...{t} gen from {self.max_iterations}...best value = {self.best_function}"
1006                )
1007
1008            # Select parents
1009            
1010            par: array2D = np.empty((self.parents_count, self.dim))
1011            """samples chosen to create new samples"""
1012            par_scores: array1D = np.empty(self.parents_count)
1013
1014            elit_slice = slice(None, self.elit_count)
1015            """elit parents"""
1016
1017            # copy needs because the generation will be removed after parents selection
1018            par[elit_slice] = pop[elit_slice].copy()
1019            par_scores[elit_slice] = scores[elit_slice].copy()
1020
1021            new_par_inds = (
1022                self.selection(
1023                    scores[self.elit_count:],
1024                    self.parents_count - self.elit_count
1025                ).astype(np.int16) + self.elit_count
1026            )
1027            """non-elit parents indexes"""
1028            #new_par_inds = self.selection(scores, self.parents_count - self.elit_count).astype(np.int16)
1029            par_slice = slice(self.elit_count, self.parents_count)
1030            par[par_slice] = pop[new_par_inds].copy()
1031            par_scores[par_slice] = scores[new_par_inds].copy()
1032
1033            pop = np.empty((self.population_size, self.dim))
1034            """new generation"""
1035            scores = np.empty(self.population_size)
1036            """new generation scores"""
1037
1038            parents_slice = slice(None, self.parents_count)
1039            pop[parents_slice] = par
1040            scores[parents_slice] = par_scores
1041
1042            if self.fill_children is None:  # default fill children behaviour
1043                DO_MUTATION = self.needs_mutation
1044                for k in range(self.parents_count, self.population_size, 2):
1045
1046                    r1, r2 = get_parents_inds()
1047
1048                    pvar1 = pop[r1]  # equal to par[r1], but better for cache
1049                    pvar2 = pop[r2]
1050
1051                    ch1, ch2 = self.crossover(pvar1, pvar2)
1052
1053                    if DO_MUTATION:
1054                        ch1 = self.mut(ch1)
1055                        ch2 = self.mut_middle(ch2, pvar1, pvar2)
1056
1057                    pop[k] = ch1
1058                    pop[k+1] = ch2
1059            else:  # custom behaviour
1060                self.fill_children(pop, self.parents_count)
1061
1062            if apply_function_to_parents:
1063                scores = self.set_function(pop)
1064            else:
1065                scores[self.parents_count:] = self.set_function(pop[self.parents_count:])
1066            
1067            # remove duplicates
1068            pop, scores = remover(pop, scores, t)
1069            # revolution
1070            pop, scores = revolution(pop, scores, count_stagnation)
1071
1072            # make population better
1073            args_to_sort = scores.argsort()
1074            pop = pop[args_to_sort]
1075            scores = scores[args_to_sort]
1076            self._update_report(scores)
1077
1078            # callback it
1079            total_callback(t, self.report, pop, scores)
1080            total_middle_callback()
1081
1082            t += 1
1083
1084        self.best_function = fast_min(scores[0], self.best_function)
1085
1086        last_generation = Generation(pop, scores)
1087        self.result = GAResult(last_generation)
1088
1089        if save_last_generation_as is not None:
1090            last_generation.save(save_last_generation_as)
1091
1092        if enable_printing:
1093            show = ' ' * 200
1094            sys.stdout.write(
1095                f'\r{show}\n'
1096                f'\r The best found solution:\n {pop[0]}'
1097                f'\n\n Objective function:\n {self.best_function}\n'
1098                f'\n Used generations: {len(self.report)}'
1099                f'\n Used time: {time.time() - start_time:.3g} seconds\n'
1100            )
1101            sys.stdout.flush() 
1102        
1103        if not no_plot:
1104            self.plot_results()
1105
1106        if enable_printing:
1107            if reason_to_stop is not None and 'iterations' not in reason_to_stop:
1108                sys.stdout.write(
1109                    f'\nWarning: GA is terminated because of {reason_to_stop}'
1110                )
1111
1112        return self.result
1113
1114    #endregion
1115
1116    #region PLOTTING
1117
1118    def plot_results(
1119        self,
1120        title: str = 'Genetic Algorithm',
1121        save_as: Optional[str] = None,
1122        dpi: int = 200,
1123        main_color: str = 'blue'
1124     ):
1125        """
1126        Simple plot of self.report (if not empty)
1127        """
1128        if len(self.report) == 0:
1129            sys.stdout.write("No results to plot!\n")
1130            return
1131
1132        plot_several_lines(
1133            lines=[self.report],
1134            colors=[main_color],
1135            labels=['best of generation'],
1136            linewidths=[2],
1137            title=title,
1138            xlabel='Generation',
1139            ylabel='Minimized function',
1140            save_as=save_as,
1141            dpi=dpi
1142        )
1143
1144    def plot_generation_scores(self, title: str = 'Last generation scores', save_as: Optional[str] = None):
1145        """
1146        Plots barplot of scores of last population
1147        """
1148
1149        if not hasattr(self, 'result'):
1150            raise Exception(
1151                "There is no 'result' field into ga object! Before plotting generation u need to run seaching process"
1152            )
1153
1154        plot_pop_scores(self.result.last_generation.scores, title, save_as)
1155
1156    #endregion
1157
1158    #region MUTATION
1159    def mut(self, x: array1D):
1160        """
1161        just mutation
1162        """
1163
1164        for i in self.indexes_int_mut:
1165            if random.random() < self.prob_mut_discrete:
1166                x[i] = self.discrete_mutation(x[i], *self.var_bounds[i])
1167
1168        for i in self.indexes_float_mut:                
1169            if random.random() < self.prob_mut:
1170                x[i] = self.real_mutation(x[i], *self.var_bounds[i])
1171            
1172        return x
1173
1174    def mut_middle(self, x: array1D, p1: array1D, p2: array1D):
1175        """
1176        mutation oriented on parents
1177        """
1178        for i in self.indexes_int_mut:
1179
1180            if random.random() < self.prob_mut_discrete:
1181                v1, v2 = p1[i], p2[i]
1182                if v1 < v2:
1183                    x[i] = random.randint(v1, v2)
1184                elif v1 > v2:
1185                    x[i] = random.randint(v2, v1)
1186                else:
1187                    x[i] = random.randint(*self.var_bounds[i])
1188                        
1189        for i in self.indexes_float_mut:                
1190            if random.random() < self.prob_mut:
1191                v1, v2 = p1[i], p2[i]
1192                if v1 != v2:
1193                    x[i] = random.uniform(v1, v2)
1194                else:
1195                    x[i] = random.uniform(*self.var_bounds[i])
1196        return x
1197
1198    #endregion
1199
1200    #region Set functions
1201
1202    @staticmethod
1203    def default_set_function(function_for_set: Callable[[array1D], float]):
1204        """
1205        simple function for creating set_function 
1206        function_for_set just applies to each row of population
1207        """
1208        def func(matrix: array2D):
1209            return np.array(
1210                [function_for_set(row) for row in matrix]
1211            )
1212        return func
1213
1214    @staticmethod
1215    def vectorized_set_function(function_for_set: Callable[[array1D], float]):
1216        """
1217        works like default, but faster for big populations and slower for little
1218        function_for_set just applyes to each row of population
1219        """
1220
1221        func = np.vectorize(function_for_set, signature='(n)->()')
1222        return func
1223
1224    @staticmethod
1225    def set_function_multiprocess(function_for_set: Callable[[array1D], float], n_jobs: int = -1):
1226        """
1227        like function_for_set but uses joblib with n_jobs (-1 goes to count of available processors)
1228        """
1229        try:
1230            from joblib import Parallel, delayed
1231        except ModuleNotFoundError:
1232            raise ModuleNotFoundError(
1233                "this additional feature requires joblib package," 
1234                "run `pip install joblib` or `pip install --upgrade geneticalgorithm2[full]`" 
1235                "or use another set function"
1236            )
1237
1238        def func(matrix: array2D):
1239            result = Parallel(n_jobs=n_jobs)(delayed(function_for_set)(matrix[i]) for i in range(matrix.shape[0]))
1240            return np.array(result)
1241        return func
1242            
1243    #endregion
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257            
1258            
VARIABLE_TYPE: typing_extensions.TypeAlias = typing.Literal['int', 'real', 'bool']

the variable type for a given or all dimension, determines the values discretion: real: double numbers int: integer number only bool: in the fact is integer with bounds [0, 1]

class GeneticAlgorithm2:
  52class GeneticAlgorithm2:
  53    """
  54    Genetic algorithm optimization process
  55    """
  56    
  57    default_params = AlgorithmParams()
  58    PROGRESS_BAR_LEN = 20
  59    """max count of symbols in the progress bar"""
  60
  61    @property
  62    def output_dict(self):
  63        warnings.warn(
  64            "'output_dict' is deprecated and will be removed at version 7 \n"
  65            "use 'result' instead"
  66        )
  67        return self.result
  68
  69    @property
  70    def needs_mutation(self) -> bool:
  71        """whether the mutation is required"""
  72        return self.prob_mut > 0 or self.prob_mut_discrete > 0
  73
  74    #region INIT
  75
  76    def __init__(
  77        self,
  78        function: FunctionToMinimize = None,
  79
  80        dimension: int = 0,
  81        variable_type: Union[VARIABLE_TYPE, Sequence[VARIABLE_TYPE]] = 'bool',
  82        variable_boundaries: Optional[Union[array2D, Sequence[Tuple[float, float]]]] = None,
  83
  84        variable_type_mixed=None,
  85
  86        function_timeout: Optional[float] = None,
  87        algorithm_parameters: Union[AlgorithmParams, Dict[str, Any]] = default_params
  88    ):
  89        """
  90        initializes the GA object and performs main checks
  91
  92        Args:
  93            function: the given objective function to be minimized -- deprecated and moved to run() method
  94            dimension: the number of decision variables, the population samples dimension
  95
  96            variable_type: string means the variable type for all variables,
  97                for mixed types use sequence of strings of type for each variable
  98
  99            variable_boundaries: leave it None if variable_type is 'bool';
 100                otherwise provide a sequence of tuples of length two as boundaries for each variable;
 101                the length of the array must be equal dimension.
 102                For example, ([0,100], [0,200]) determines
 103                    lower boundary 0 and upper boundary 100 for first
 104                    and upper boundary 200 for second variable
 105                    and dimension must be 2.
 106
 107            variable_type_mixed -- deprecated
 108
 109            function_timeout: if the given function does not provide
 110                output before function_timeout (unit is seconds) the algorithm raises error.
 111                For example, when there is an infinite loop in the given function.
 112                `None` means disabling -- deprecated and moved to run()
 113
 114            algorithm_parameters: AlgorithmParams object or usual dictionary with algorithm parameter;
 115                it is not mandatory to provide all possible parameters
 116
 117        Notes:
 118            - This implementation minimizes the given objective function.
 119            For maximization u can multiply the function by -1 (for instance): the absolute
 120                value of the output would be the actual objective function
 121
 122        for more details and examples of implementation please visit:
 123            https://github.com/PasaOpasen/geneticalgorithm2
 124  
 125        """
 126
 127        # all default fields
 128
 129        # self.crossover: Callable[[np.ndarray, np.ndarray], Tuple[np.ndarray, np.ndarray]] = None
 130        # self.real_mutation: Callable[[float, float, float], float] = None
 131        # self.discrete_mutation: Callable[[int, int, int], int] = None
 132        # self.selection: Callable[[np.ndarray, int], np.ndarray] = None
 133
 134        self.result = None
 135        self.best_function = None
 136
 137        self.revolution_oppositor = None
 138        self.dup_oppositor = None
 139        self.creator = None
 140        self.init_oppositors = None
 141
 142        self.f: Callable[[array1D], float] = None
 143        self.funtimeout: float = None
 144
 145        self.set_function: Callable[[np.ndarray], np.ndarray] = None
 146
 147        # self.dim: int = None
 148        self.var_bounds: List[Tuple[Union[int, float], Union[int, float]]] = None
 149        # self.indexes_int: np.ndarray = None
 150        # self.indexes_float: np.ndarray = None
 151
 152        self.checked_reports: List[Tuple[str, Callable[[array1D], None]]] = None
 153
 154        self.population_size: int = None
 155        self.progress_stream = None
 156
 157        # input algorithm's parameters
 158
 159        assert isinstance(algorithm_parameters, (dict, AlgorithmParams)), (
 160            "algorithm_parameters must be dict or AlgorithmParams object"
 161        )
 162        if not isinstance(algorithm_parameters, AlgorithmParams):
 163            algorithm_parameters = AlgorithmParams.from_dict(algorithm_parameters)
 164        algorithm_parameters.validate()
 165        self.param = algorithm_parameters
 166
 167        self.crossover, self.real_mutation, self.discrete_mutation, self.selection = algorithm_parameters.get_CMS_funcs()
 168
 169        # dimension and area bounds
 170        self.dim = int(dimension)
 171        assert self.dim > 0, f"dimension count must be int and >0, gotten {dimension}"
 172
 173        if variable_type_mixed is not None:
 174            warnings.warn(
 175                f"argument variable_type_mixed is deprecated and will be removed at version 7\n "
 176                f"use variable_type={tuple(variable_type_mixed)} instead"
 177            )
 178            variable_type = variable_type_mixed
 179        self._set_types_indexes(variable_type)  # types indexes
 180        self._set_var_boundaries(variable_type, variable_boundaries)  # input variables' boundaries
 181
 182        # fix mutation probs
 183
 184        assert can_be_prob(self.param.mutation_probability)
 185        self.prob_mut = self.param.mutation_probability
 186        assert self.param.mutation_discrete_probability is None or can_be_prob(self.param.mutation_discrete_probability)
 187        self.prob_mut_discrete = self.param.mutation_discrete_probability or self.prob_mut
 188
 189        if self.param.crossover_probability is not None:
 190            warnings.warn(
 191                f"crossover_probability is deprecated and will be removed in version 7. "
 192                f"Reason: it's old and has no sense"
 193            )
 194
 195        #############################################################
 196        # input function
 197        if function:
 198            warnings.warn(
 199                f"function is deprecated in init constructor and will be removed in version 7. "
 200                f"Move this argument to run() method"
 201            )
 202            self._check_function(function)
 203        if function_timeout:
 204            warnings.warn(
 205                f"function_timeout is deprecated in init constructor and will be removed in version 7. "
 206                f"Move this argument to run() method"
 207            )
 208            self._check_function_timeout(function_timeout)
 209
 210        #############################################################
 211        
 212        self.population_size = int(self.param.population_size)
 213        self._set_parents_count(self.param.parents_portion)
 214        self._set_elit_count(self.population_size, self.param.elit_ratio)
 215        assert self.parents_count >= self.elit_count, (
 216            f"\n number of parents ({self.parents_count}) "
 217            f"must be greater than number of elits ({self.elit_count})"
 218        )
 219
 220        self._set_max_iterations()
 221
 222        self._set_report()
 223
 224        # specify this function to speed up or change default behaviour
 225        self.fill_children: Optional[Callable[[array2D, int], None]] = None
 226        """
 227        custom function which adds children for population POP 
 228            where POP[:parents_count] are parents lines and next lines are for children
 229        """
 230
 231    def _set_types_indexes(self, variable_type: Union[str, Sequence[str]]):
 232
 233        indexes = np.arange(self.dim)
 234        self.indexes_int = np.array([])
 235        self.indexes_float = np.array([])
 236
 237        assert_message = (
 238            f"\n variable_type must be 'bool', 'int', 'real' or a sequence with 'int' and 'real', got {variable_type}"
 239        )
 240
 241        if isinstance(variable_type, str):
 242            assert (variable_type in VARIABLE_TYPE.__args__), assert_message
 243            if variable_type == 'real':
 244                self.indexes_float = indexes
 245            else:
 246                self.indexes_int = indexes
 247
 248        else:  # sequence case
 249
 250            assert len(variable_type) == self.dim, (
 251                f"\n variable_type must have a length equal dimension. "
 252                f"Should be {self.dim}, got {len(variable_type)}"
 253            )
 254            assert 'bool' not in variable_type, (
 255                "don't use 'bool' if variable_type is a sequence, "
 256                "for 'boolean' case use 'int' and specify boundary as (0,1)"
 257            )
 258            assert all(v in VARIABLE_TYPE.__args__ for v in variable_type), assert_message
 259
 260            vartypes = np.array(variable_type)
 261            self.indexes_int = indexes[vartypes == 'int']
 262            self.indexes_float = indexes[vartypes == 'real']
 263
 264    def _set_var_boundaries(
 265        self,
 266        variable_type: Union[str, Sequence[str]],
 267        variable_boundaries
 268    ):
 269        if isinstance(variable_type, str) and variable_type == 'bool':
 270            self.var_bounds = [(0, 1)] * self.dim
 271        else:
 272
 273            if is_numpy(variable_boundaries):
 274                assert variable_boundaries.shape == (self.dim, 2), (
 275                    f"\n if variable_boundaries is numpy array, it must be with shape (dim, 2)"
 276                )
 277            else:
 278                assert len(variable_boundaries) == self.dim and all((len(t) == 2 for t in variable_boundaries)), (
 279                    "\n if variable_boundaries is sequence, "
 280                    "it must be with len dim and boundary for each variable must be a tuple of length two"
 281                )
 282
 283            for i in variable_boundaries:
 284                assert i[0] <= i[1], "\n lower_boundaries must be smaller than upper_boundaries [lower,upper]"
 285
 286            self.var_bounds = [(i[0], i[1]) for i in variable_boundaries]
 287
 288    def _set_parents_count(self, parents_portion: float):
 289
 290        self.parents_count = int(parents_portion * self.population_size)
 291        assert self.population_size >= self.parents_count > 1, (
 292            f'parents count {self.parents_count} cannot be less than population size {self.population_size}'
 293        )
 294        trl = self.population_size - self.parents_count
 295        if trl % 2 != 0:
 296            self.parents_count += 1
 297
 298    def _set_elit_count(self, pop_size: int, elit_ratio: Union[float, int]):
 299
 300        assert elit_ratio >= 0
 301        self.elit_count = elit_ratio if isinstance(elit_ratio, str) else math.ceil(pop_size*elit_ratio)
 302
 303    def _set_max_iterations(self):
 304
 305        if self.param.max_num_iteration is None:
 306            iterate = 0
 307            for i in range(0, self.dim):
 308                bound_min, bound_max = self.var_bounds[i]
 309                var_space = bound_max - bound_min
 310                if i in self.indexes_int:
 311                    iterate += var_space * self.dim * (100 / self.population_size)
 312                else:
 313                    iterate += var_space * 50 * (100 / self.population_size)
 314            iterate = int(iterate)
 315            if (iterate * self.population_size) > 10000000:
 316                iterate = 10000000 / self.population_size
 317
 318            self.max_iterations = fast_min(iterate, 8000)
 319        else:
 320            assert self.param.max_num_iteration > 0
 321            self.max_iterations = math.ceil(self.param.max_num_iteration)
 322
 323        max_it = self.param.max_iteration_without_improv
 324        if max_it is None:
 325            self.max_stagnations = self.max_iterations + 1
 326        else:
 327            self.max_stagnations = math.ceil(max_it)
 328
 329    def _check_function(self, function: Callable[[array1D], float]):
 330        assert callable(function), "function must be callable!"
 331        self.f = function
 332
 333    def _check_function_timeout(self, function_timeout: Optional[float]):
 334
 335        if function_timeout is not None and function_timeout > 0:
 336            try:
 337                from func_timeout import func_timeout, FunctionTimedOut
 338            except ModuleNotFoundError:
 339                raise ModuleNotFoundError(
 340                    "function_timeout > 0 needs additional package func_timeout\n"
 341                    "run `python -m pip install func_timeout`\n"
 342                    "or disable this parameter: function_timeout=None"
 343                )
 344
 345        self.funtimeout = None if function_timeout is None else float(function_timeout)
 346
 347    #endregion
 348
 349    #region REPORT
 350
 351    def _set_report(self):
 352        """
 353        creates default report checker
 354        """
 355        self.checked_reports = [
 356            # item 0 cuz scores will be sorted and min item is items[0]
 357            ('report', operator.itemgetter(0))
 358        ]
 359
 360    def _clear_report(self):
 361        """
 362        removes all report objects
 363        """
 364        fields = [f for f in vars(self).keys() if f.startswith('report')]
 365        for attr in fields:
 366            delattr(self, attr)
 367
 368    def _init_report(self):
 369        """
 370        makes empty report fields
 371        """
 372        for name, _ in self.checked_reports:
 373            setattr(self, name, [])
 374
 375    def _update_report(self, scores: array1D):
 376        """
 377        append report value to the end of field
 378        """
 379        for name, func in self.checked_reports:
 380            getattr(self, name).append(
 381                func(scores)
 382            )
 383
 384    #endregion
 385
 386    #region RUN METHODS
 387
 388    def _progress(self, count: int, total: int, status: str = ''):
 389
 390        part = count / total
 391
 392        filled_len = round(GeneticAlgorithm2.PROGRESS_BAR_LEN * part)
 393        percents = round(100.0 * part, 1)
 394        bar = '|' * filled_len + '_' * (GeneticAlgorithm2.PROGRESS_BAR_LEN - filled_len)
 395
 396        self.progress_stream.write('\r%s %s%s %s' % (bar, percents, '%', status))
 397        self.progress_stream.flush()
 398
 399    def __str__(self):
 400        return f"Genetic algorithm object with parameters {self.param}"
 401
 402    def __repr__(self):
 403        return self.__str__()
 404
 405    def _simulate(self, sample: array1D):
 406
 407        from func_timeout import func_timeout, FunctionTimedOut
 408
 409        obj = None
 410        eval_time = time.time()
 411        try:
 412            obj = func_timeout(
 413                self.funtimeout,
 414                lambda: self.f(sample)
 415            )
 416        except FunctionTimedOut:
 417            print("given function is not applicable")
 418        eval_time = time.time() - eval_time
 419
 420        assert obj is not None, (
 421            f"the given function was running like {eval_time} seconds and does not provide any output"
 422        )
 423
 424        tp = type(obj)
 425        assert (
 426            tp in (int, float) or np.issubdtype(tp, np.floating) or np.issubdtype(tp, np.integer)
 427        ), f"Minimized function should return a number, but got '{obj}' object with type {tp}"
 428
 429        return obj, eval_time
 430
 431    def _set_mutation_indexes(self, mutation_indexes: Optional[Iterable[int]]):
 432
 433        if mutation_indexes is None:
 434            self.indexes_float_mut = self.indexes_float
 435            self.indexes_int_mut = self.indexes_int
 436        else:
 437            tmp_indexes = set(mutation_indexes)
 438            self.indexes_int_mut = np.array(list(tmp_indexes.intersection(self.indexes_int)))
 439            self.indexes_float_mut = np.array(list(tmp_indexes.intersection(self.indexes_float)))
 440
 441            if self.indexes_float_mut.size == 0 and self.indexes_int_mut.size == 0:
 442                warnings.warn(f"No mutation dimensions!!! Check ur mutation indexes!!")
 443
 444    #@profile
 445    def run(
 446        self,
 447        no_plot: bool = False,
 448        disable_printing: bool = False,
 449        progress_bar_stream: Optional[str] = 'stdout',
 450
 451        # deprecated
 452        disable_progress_bar: bool = False,
 453
 454        function: FunctionToMinimize = None,
 455        function_timeout: Optional[float] = None,
 456
 457        set_function: SetFunctionToMinimize = None,
 458        apply_function_to_parents: bool = False,
 459        start_generation: GenerationConvertible = Generation(),
 460        studEA: bool = False,
 461        mutation_indexes: Optional[Iterable[int]] = None,
 462
 463        init_creator: Optional[CreatorFunc] = None,
 464        init_oppositors: Optional[Sequence[OppositorFunc]] = None,
 465
 466        duplicates_oppositor: Optional[OppositorFunc] = None,
 467        remove_duplicates_generation_step: Optional[int] = None,
 468
 469        revolution_oppositor: Optional[OppositorFunc] = None,
 470        revolution_after_stagnation_step: Optional[int] = None,
 471        revolution_part: float = 0.3,
 472
 473        population_initializer: Tuple[
 474            int, PopulationModifier
 475        ] = get_population_initializer(select_best_of=1, local_optimization_step='never', local_optimizer=None),
 476
 477        stop_when_reached: Optional[float] = None,
 478        callbacks: Optional[Sequence[SimpleCallbackFunc]] = None,
 479        middle_callbacks: Optional[Sequence[MiddleCallbackFunc]] = None,  #+
 480        time_limit_secs: Optional[float] = None,
 481        save_last_generation_as: Optional[str] = None,
 482        seed: Optional[int] = None
 483    ) -> GAResult:
 484        """
 485        runs optimization process
 486
 487        Args:
 488            no_plot: do not plot results using matplotlib by default
 489
 490            disable_printing: do not print log info of optimization process (except progress bar)
 491
 492            progress_bar_stream: 'stdout', 'stderr' or None to disable progress bar;
 493                disabling progress bar can speed up the optimization process in many cases
 494
 495            disable_progress_bar: deprecated
 496
 497            function: the given objective function (sample -> its score) to be minimized;
 498
 499            function_timeout: if the given function does not provide
 500                output before function_timeout (unit is seconds) the algorithm raises error.
 501                For example, when there is an infinite loop in the given function.
 502                `None` means disabling
 503
 504            set_function: set function (all samples -> score per sample) to be used instead of usual function
 505                (usually for optimization purposes)
 506
 507            apply_function_to_parents: whether to apply function to parents from previous generation (if it's needed), 
 508                it can be needed at working with games agents, but for other tasks will just waste time
 509
 510            start_generation: initial generation object of any `GenerationConvertible` type
 511
 512            studEA: using stud EA strategy (crossover with best object always)
 513
 514            mutation_indexes: indexes of dimensions where mutation can be performed (all dimensions by default)
 515
 516            init_creator: the function creates population samples.
 517                By default -- random uniform for real variables and random uniform for int
 518            init_oppositors: the list of oppositors creates oppositions for base population. No by default
 519
 520            duplicates_oppositor: oppositor for applying after duplicates removing.
 521                By default -- using just random initializer from creator
 522            remove_duplicates_generation_step: step for removing duplicates (have a sense with discrete tasks).
 523                No by default
 524
 525            revolution_oppositor: oppositor for revolution time. No by default
 526            revolution_after_stagnation_step: create revolution after this generations of stagnation. No by default
 527            revolution_part: float, the part of generation to being oppose. By default is 0.3
 528
 529            population_initializer: object for actions at population initialization step
 530                to create better start population. See doc
 531
 532            stop_when_reached: stop searching after reaching this value 
 533                (it can be potential minimum or something else)
 534
 535            callbacks: sequence of callback functions with structure:
 536                (generation_number, report_list, last_population, last_scores) -> do some action
 537
 538            middle_callbacks: sequence of functions made by `MiddleCallback` class
 539
 540            time_limit_secs: limit time of working (in seconds);
 541                `None` means no time limit (limit will be only for count of generation and so on)
 542
 543            save_last_generation_as: path to .npz file 
 544                for saving last_generation as numpy dictionary like
 545                {'population': 2D-array, 'scores': 1D-array}; 
 546                None disables this option
 547
 548            seed: random seed (None if doesn't matter)
 549
 550        Returns:
 551            `GAResult` object;
 552            also fills the self.report and self.result with many report information
 553
 554        Notes:
 555            - if `function_timeout` is enabled then `function` must be set
 556            - it would be more logical to use params like `studEA` as an algorithm parameter, 
 557                but `run()`-way can be more convenient for real using
 558        """
 559
 560        if disable_progress_bar:
 561            warnings.warn(
 562                f"disable_progress_bar is deprecated and will be removed in version 7, "
 563                f"use probress_bar_stream=None to disable progress bar"
 564            )
 565            progress_bar_stream = None
 566
 567        enable_printing: bool = not disable_printing
 568
 569        start_generation = Generation.from_object(self.dim, start_generation)
 570
 571        assert is_current_gen_number(revolution_after_stagnation_step), "must be None or int and >0"
 572        assert is_current_gen_number(remove_duplicates_generation_step), "must be None or int and >0"
 573        assert can_be_prob(revolution_part), f"revolution_part must be in [0,1], not {revolution_part}"
 574        assert stop_when_reached is None or isinstance(stop_when_reached, (int, float))
 575        assert isinstance(callbacks, collections.abc.Sequence) or callbacks is None, (
 576            "callbacks should be a list of callbacks functions"
 577        )
 578        assert isinstance(middle_callbacks, collections.abc.Sequence) or middle_callbacks is None, (
 579            "middle_callbacks should be list of MiddleCallbacks functions"
 580        )
 581        assert time_limit_secs is None or time_limit_secs > 0, 'time_limit_secs must be None of number > 0'
 582
 583        self._set_mutation_indexes(mutation_indexes)
 584        from OppOpPopInit import set_seed
 585        set_seed(seed)
 586
 587        # randomstate = np.random.default_rng(random.randint(0, 1000) if seed is None else seed)
 588        # self.randomstate = randomstate
 589
 590        # using bool flag is faster than using empty function with generated string arguments
 591        SHOW_PROGRESS = progress_bar_stream is not None
 592        if SHOW_PROGRESS:
 593
 594            show_progress = lambda t, t2, s: self._progress(t, t2, status=s)
 595
 596            if progress_bar_stream == 'stdout':
 597                self.progress_stream = sys.stdout
 598            elif progress_bar_stream == 'stderr':
 599                self.progress_stream = sys.stderr
 600            else:
 601                raise Exception(
 602                    f"wrong value {progress_bar_stream} of progress_bar_stream, must be 'stdout'/'stderr'/None"
 603                )
 604        else:
 605            show_progress = None
 606
 607        stop_by_val = (
 608            (lambda best_f: False)
 609            if stop_when_reached is None
 610            else (lambda best_f: best_f <= stop_when_reached)
 611        )
 612
 613        t: int = 0
 614        count_stagnation: int = 0
 615        pop: array2D = None
 616        scores: array1D = None
 617
 618        #region CALLBACKS
 619
 620        def get_data():
 621            """
 622            returns all important data about model
 623            """
 624            return MiddleCallbackData(
 625                last_generation=Generation(pop, scores),
 626                current_generation=t,
 627                report_list=self.report,
 628
 629                mutation_prob=self.prob_mut,
 630                mutation_discrete_prob=self.prob_mut_discrete,
 631                mutation=self.real_mutation,
 632                mutation_discrete=self.discrete_mutation,
 633                crossover=self.crossover,
 634                selection=self.selection,
 635
 636                current_stagnation=count_stagnation,
 637                max_stagnation=self.max_stagnations,
 638
 639                parents_portion=self.param.parents_portion,
 640                elit_ratio=self.param.elit_ratio,
 641
 642                set_function=self.set_function,
 643
 644                reason_to_stop=reason_to_stop
 645            )
 646
 647        def set_data(data: MiddleCallbackData):
 648            """
 649            sets data to model
 650            """
 651            nonlocal pop, scores, count_stagnation, reason_to_stop
 652
 653            pop, scores = data.last_generation.variables, data.last_generation.scores
 654            self.population_size = pop.shape[0]
 655
 656            self.param.parents_portion = data.parents_portion
 657            self._set_parents_count(data.parents_portion)
 658
 659            self.param.elit_ratio = data.elit_ratio
 660            self._set_elit_count(self.population_size, data.elit_ratio)
 661
 662            self.prob_mut = data.mutation_prob
 663            self.prob_mut_discrete = data.mutation_discrete_prob
 664            self.real_mutation = data.mutation
 665            self.discrete_mutation = data.mutation_discrete
 666            self.crossover = data.crossover
 667            self.selection = data.selection
 668
 669            count_stagnation = data.current_stagnation
 670            reason_to_stop = data.reason_to_stop
 671            self.max_stagnations = data.max_stagnation
 672
 673            self.set_function = data.set_function
 674
 675        if not callbacks:
 676            total_callback = lambda g, r, lp, ls: None
 677        else:
 678            def total_callback(
 679                generation_number: int,
 680                report_list: List[float],
 681                last_population: array2D,
 682                last_scores: array1D
 683            ):
 684                for cb in callbacks:
 685                    cb(generation_number, report_list, last_population, last_scores)
 686
 687        if not middle_callbacks:
 688            total_middle_callback = lambda: None
 689        else:
 690            def total_middle_callback():
 691                """
 692                applies callbacks and sets new data if there is a sense
 693                """
 694                data = get_data()
 695                flag = False
 696                for cb in middle_callbacks:
 697                    data, has_sense = cb(data)
 698                    if has_sense:
 699                        flag = True
 700                if flag:
 701                    set_data(data)  # update global date if there was real callback step
 702
 703        #endregion
 704
 705        start_time = time.time()
 706        time_is_done = (
 707            (lambda: False)
 708            if time_limit_secs is None
 709            else (lambda: int(time.time() - start_time) >= time_limit_secs)
 710        )
 711
 712        # combine with deprecated parts
 713        function = function or self.f
 714        function_timeout = function_timeout or self.funtimeout
 715
 716        assert function or set_function, 'no function to minimize'
 717        if function:
 718            self._check_function(function)
 719        if function_timeout:
 720            self._check_function_timeout(function_timeout)
 721
 722        self.set_function = set_function or GeneticAlgorithm2.default_set_function(self.f)
 723
 724        #region Initial population, duplicates filter, revolutionary
 725
 726        pop_coef, initializer_func = population_initializer
 727        
 728        # population creator by random or with oppositions
 729        if init_creator is None:
 730
 731            from OppOpPopInit import SampleInitializers
 732
 733            # just uniform random
 734            self.creator = SampleInitializers.Combined(
 735                minimums=[v[0] for v in self.var_bounds],
 736                maximums=[v[1] for v in self.var_bounds],
 737                indexes=(
 738                    self.indexes_int,
 739                    self.indexes_float
 740                ),
 741                creator_initializers=(
 742                    SampleInitializers.RandomInteger,
 743                    SampleInitializers.Uniform
 744                )
 745            )
 746        else:
 747            assert callable(init_creator)
 748            self.creator = init_creator
 749
 750        self.init_oppositors = init_oppositors
 751        self.dup_oppositor = duplicates_oppositor
 752        self.revolution_oppositor = revolution_oppositor
 753
 754        # event for removing duplicates
 755        if remove_duplicates_generation_step is None:
 756            def remover(pop: array2D, scores: array1D, gen: int) -> Tuple[
 757                array2D,
 758                array1D
 759            ]:
 760                return pop, scores
 761        else:
 762            
 763            def without_dup(pop: array2D, scores: array1D):  # returns population without dups
 764                _, index_of_dups = np.unique(pop, axis=0, return_index=True)
 765                return pop[index_of_dups], scores[index_of_dups], scores.size - index_of_dups.size
 766            
 767            if self.dup_oppositor is None:  # if there is no dup_oppositor, use random creator
 768                def remover(pop: array2D, scores: array1D, gen: int) -> Tuple[
 769                    array2D,
 770                    array1D
 771                ]:
 772                    if gen % remove_duplicates_generation_step != 0:
 773                        return pop, scores
 774
 775                    pp, sc, count_to_create = without_dup(pop, scores)  # pop without dups
 776                    
 777                    if count_to_create == 0:
 778                        if SHOW_PROGRESS:
 779                            show_progress(t, self.max_iterations,
 780                                      f"GA is running...{t} gen from {self.max_iterations}. No dups!")
 781                        return pop, scores
 782
 783                    pp2 = SampleInitializers.CreateSamples(self.creator, count_to_create)  # new pop elements
 784                    pp2_scores = self.set_function(pp2)  # new elements values
 785
 786                    if SHOW_PROGRESS:
 787                        show_progress(t, self.max_iterations,
 788                                      f"GA is running...{t} gen from {self.max_iterations}. Kill dups!")
 789                    
 790                    new_pop = np.vstack((pp, pp2))
 791                    new_scores = np.concatenate((sc, pp2_scores))
 792
 793                    args_to_sort = new_scores.argsort()
 794                    return new_pop[args_to_sort], new_scores[args_to_sort]
 795
 796            else:  # using oppositors
 797                assert callable(self.dup_oppositor)
 798
 799                def remover(pop: np.ndarray, scores: np.ndarray, gen: int) -> Tuple[
 800                    np.ndarray,
 801                    np.ndarray
 802                ]:
 803                    if gen % remove_duplicates_generation_step != 0:
 804                        return pop, scores
 805
 806                    pp, sc, count_to_create = without_dup(pop, scores)  # pop without dups
 807
 808                    if count_to_create == 0:
 809                        if SHOW_PROGRESS:
 810                            show_progress(t, self.max_iterations,
 811                                          f"GA is running...{t} gen from {self.max_iterations}. No dups!")
 812                        return pop, scores
 813
 814                    if count_to_create > sc.size:
 815                        raise Exception(
 816                            f"Too many duplicates at generation {gen} ({count_to_create} > {sc.size}), cannot oppose"
 817                        )
 818
 819                    # oppose count_to_create worse elements
 820                    pp2 = OppositionOperators.Reflect(pp[-count_to_create:], self.dup_oppositor)  # new pop elements
 821                    pp2_scores = self.set_function(pp2)  # new elements values
 822
 823                    if SHOW_PROGRESS:
 824                        show_progress(t, self.max_iterations,
 825                                      f"GA is running...{t} gen from {self.max_iterations}. Kill dups!")
 826
 827                    new_pop = np.vstack((pp, pp2))
 828                    new_scores = np.concatenate((sc, pp2_scores))
 829
 830                    args_to_sort = new_scores.argsort()
 831                    return new_pop[args_to_sort], new_scores[args_to_sort]
 832
 833        # event for revolution
 834        if revolution_after_stagnation_step is None:
 835            def revolution(pop: array2D, scores: array1D, stagnation_count: int) -> Tuple[
 836                array2D,
 837                array1D
 838            ]:
 839                return pop, scores
 840        else:
 841            if revolution_oppositor is None:
 842                raise Exception(
 843                    f"How can I make revolution each {revolution_after_stagnation_step} stagnation steps "
 844                    f"if revolution_oppositor is None (not defined)?"
 845                )
 846            assert callable(revolution_oppositor)
 847
 848            from OppOpPopInit import OppositionOperators
 849            
 850            def revolution(pop: array2D, scores: array1D, stagnation_count: int) -> Tuple[
 851                array2D,
 852                array1D
 853            ]:
 854                if stagnation_count < revolution_after_stagnation_step:
 855                    return pop, scores
 856                part = int(pop.shape[0]*revolution_part)
 857                
 858                pp2 = OppositionOperators.Reflect(pop[-part:], self.revolution_oppositor)
 859                pp2_scores = self.set_function(pp2)
 860
 861                combined = np.vstack((pop, pp2))
 862                combined_scores = np.concatenate((scores, pp2_scores))
 863                args = combined_scores.argsort()
 864
 865                if SHOW_PROGRESS:
 866                    show_progress(t, self.max_iterations,
 867                                  f"GA is running...{t} gen from {self.max_iterations}. Revolution!")
 868
 869                args = args[:scores.size]
 870                return combined[args], combined_scores[args]
 871
 872        #enregion
 873
 874        #
 875        #
 876        #  START ALGORITHM LOGIC
 877        #
 878        #
 879
 880        # Report
 881        self._clear_report()  # clear old report objects
 882        self._init_report()
 883
 884        # initialization of pop
 885        
 886        if start_generation.variables is None:
 887
 888            from OppOpPopInit import init_population
 889
 890            real_pop_size = self.population_size * pop_coef
 891
 892            # pop = np.empty((real_pop_size, self.dim))
 893            scores = np.empty(real_pop_size)
 894            
 895            pop = init_population(
 896                samples_count=real_pop_size,
 897                creator=self.creator,
 898                oppositors=self.init_oppositors
 899            )
 900
 901            if self.funtimeout and self.funtimeout > 0:  # perform simulation
 902            
 903                time_counter = 0
 904
 905                for p in range(0, real_pop_size):
 906                    # simulation returns exception or func value -- check the time of evaluating
 907                    value, eval_time = self._simulate(pop[p])
 908                    scores[p] = value
 909                    time_counter += eval_time
 910
 911                if enable_printing:
 912                    print(
 913                        f"\nSim: Average time of function evaluating (secs): "
 914                        f"{time_counter/real_pop_size} (total = {time_counter})\n"
 915                    )
 916            else:
 917
 918                eval_time = time.time()
 919                scores = self.set_function(pop)
 920                eval_time = time.time() - eval_time
 921                if enable_printing:
 922                    print(
 923                        f"\nSet: Average time of function evaluating (secs): "
 924                        f"{eval_time/real_pop_size} (total = {eval_time})\n"
 925                    )
 926                
 927        else:
 928
 929            self.population_size = start_generation.variables.shape[0]
 930            self._set_elit_count(self.population_size, self.param.elit_ratio)
 931            self._set_parents_count(self.param.parents_portion)
 932
 933            pop = start_generation.variables
 934
 935            if start_generation.scores is None:
 936
 937                _time = time.time()
 938                scores = self.set_function(pop)
 939                _time = time.time() - _time
 940
 941                if enable_printing:
 942                    print(
 943                        f'\nFirst scores are made from gotten variables '
 944                        f'(by {_time} secs, about {_time/scores.size} for each creature)\n'
 945                    )
 946            else:
 947                scores = start_generation.scores
 948                if enable_printing:
 949                    print(f"\nFirst scores are from gotten population\n")
 950
 951        # Initialization by select bests and local_descent
 952        
 953        pop, scores = initializer_func(pop, scores)
 954
 955        # first sort
 956        args_to_sort = scores.argsort()
 957        pop = pop[args_to_sort]
 958        scores = scores[args_to_sort]
 959        self._update_report(scores)
 960
 961        self.population_size = scores.size
 962        self.best_function = scores[0]
 963
 964        if enable_printing:
 965            print(
 966                f"Best score before optimization: {self.best_function}"
 967            )
 968
 969        t: int = 1
 970        count_stagnation: int = 0
 971        """iterations without progress"""
 972        reason_to_stop: Optional[str] = None
 973
 974        # gets indexes of 2 parents to crossover
 975        if studEA:
 976            get_parents_inds = lambda: (0, random.randrange(1, self.parents_count))
 977        else:
 978            get_parents_inds = lambda: random_indexes_pair(self.parents_count)
 979
 980        #  while no reason to stop
 981        while True:
 982
 983            if count_stagnation > self.max_stagnations:
 984                reason_to_stop = f"limit of fails: {count_stagnation}"
 985            elif t == self.max_iterations:
 986                reason_to_stop = f'limit of iterations: {t}'
 987            elif stop_by_val(self.best_function):
 988                reason_to_stop = f"stop value reached: {self.best_function} <= {stop_when_reached}"
 989            elif time_is_done():
 990                reason_to_stop = f'time is done: {time.time() - start_time} >= {time_limit_secs}'
 991
 992            if reason_to_stop is not None:
 993                if SHOW_PROGRESS:
 994                    show_progress(t, self.max_iterations, f"GA is running... STOP! {reason_to_stop}")
 995                break
 996
 997            if scores[0] < self.best_function:  # if there is progress
 998                count_stagnation = 0
 999                self.best_function = scores[0]
1000            else:
1001                count_stagnation += 1
1002
1003            if SHOW_PROGRESS:
1004                show_progress(
1005                    t,
1006                    self.max_iterations,
1007                    f"GA is running...{t} gen from {self.max_iterations}...best value = {self.best_function}"
1008                )
1009
1010            # Select parents
1011            
1012            par: array2D = np.empty((self.parents_count, self.dim))
1013            """samples chosen to create new samples"""
1014            par_scores: array1D = np.empty(self.parents_count)
1015
1016            elit_slice = slice(None, self.elit_count)
1017            """elit parents"""
1018
1019            # copy needs because the generation will be removed after parents selection
1020            par[elit_slice] = pop[elit_slice].copy()
1021            par_scores[elit_slice] = scores[elit_slice].copy()
1022
1023            new_par_inds = (
1024                self.selection(
1025                    scores[self.elit_count:],
1026                    self.parents_count - self.elit_count
1027                ).astype(np.int16) + self.elit_count
1028            )
1029            """non-elit parents indexes"""
1030            #new_par_inds = self.selection(scores, self.parents_count - self.elit_count).astype(np.int16)
1031            par_slice = slice(self.elit_count, self.parents_count)
1032            par[par_slice] = pop[new_par_inds].copy()
1033            par_scores[par_slice] = scores[new_par_inds].copy()
1034
1035            pop = np.empty((self.population_size, self.dim))
1036            """new generation"""
1037            scores = np.empty(self.population_size)
1038            """new generation scores"""
1039
1040            parents_slice = slice(None, self.parents_count)
1041            pop[parents_slice] = par
1042            scores[parents_slice] = par_scores
1043
1044            if self.fill_children is None:  # default fill children behaviour
1045                DO_MUTATION = self.needs_mutation
1046                for k in range(self.parents_count, self.population_size, 2):
1047
1048                    r1, r2 = get_parents_inds()
1049
1050                    pvar1 = pop[r1]  # equal to par[r1], but better for cache
1051                    pvar2 = pop[r2]
1052
1053                    ch1, ch2 = self.crossover(pvar1, pvar2)
1054
1055                    if DO_MUTATION:
1056                        ch1 = self.mut(ch1)
1057                        ch2 = self.mut_middle(ch2, pvar1, pvar2)
1058
1059                    pop[k] = ch1
1060                    pop[k+1] = ch2
1061            else:  # custom behaviour
1062                self.fill_children(pop, self.parents_count)
1063
1064            if apply_function_to_parents:
1065                scores = self.set_function(pop)
1066            else:
1067                scores[self.parents_count:] = self.set_function(pop[self.parents_count:])
1068            
1069            # remove duplicates
1070            pop, scores = remover(pop, scores, t)
1071            # revolution
1072            pop, scores = revolution(pop, scores, count_stagnation)
1073
1074            # make population better
1075            args_to_sort = scores.argsort()
1076            pop = pop[args_to_sort]
1077            scores = scores[args_to_sort]
1078            self._update_report(scores)
1079
1080            # callback it
1081            total_callback(t, self.report, pop, scores)
1082            total_middle_callback()
1083
1084            t += 1
1085
1086        self.best_function = fast_min(scores[0], self.best_function)
1087
1088        last_generation = Generation(pop, scores)
1089        self.result = GAResult(last_generation)
1090
1091        if save_last_generation_as is not None:
1092            last_generation.save(save_last_generation_as)
1093
1094        if enable_printing:
1095            show = ' ' * 200
1096            sys.stdout.write(
1097                f'\r{show}\n'
1098                f'\r The best found solution:\n {pop[0]}'
1099                f'\n\n Objective function:\n {self.best_function}\n'
1100                f'\n Used generations: {len(self.report)}'
1101                f'\n Used time: {time.time() - start_time:.3g} seconds\n'
1102            )
1103            sys.stdout.flush() 
1104        
1105        if not no_plot:
1106            self.plot_results()
1107
1108        if enable_printing:
1109            if reason_to_stop is not None and 'iterations' not in reason_to_stop:
1110                sys.stdout.write(
1111                    f'\nWarning: GA is terminated because of {reason_to_stop}'
1112                )
1113
1114        return self.result
1115
1116    #endregion
1117
1118    #region PLOTTING
1119
1120    def plot_results(
1121        self,
1122        title: str = 'Genetic Algorithm',
1123        save_as: Optional[str] = None,
1124        dpi: int = 200,
1125        main_color: str = 'blue'
1126     ):
1127        """
1128        Simple plot of self.report (if not empty)
1129        """
1130        if len(self.report) == 0:
1131            sys.stdout.write("No results to plot!\n")
1132            return
1133
1134        plot_several_lines(
1135            lines=[self.report],
1136            colors=[main_color],
1137            labels=['best of generation'],
1138            linewidths=[2],
1139            title=title,
1140            xlabel='Generation',
1141            ylabel='Minimized function',
1142            save_as=save_as,
1143            dpi=dpi
1144        )
1145
1146    def plot_generation_scores(self, title: str = 'Last generation scores', save_as: Optional[str] = None):
1147        """
1148        Plots barplot of scores of last population
1149        """
1150
1151        if not hasattr(self, 'result'):
1152            raise Exception(
1153                "There is no 'result' field into ga object! Before plotting generation u need to run seaching process"
1154            )
1155
1156        plot_pop_scores(self.result.last_generation.scores, title, save_as)
1157
1158    #endregion
1159
1160    #region MUTATION
1161    def mut(self, x: array1D):
1162        """
1163        just mutation
1164        """
1165
1166        for i in self.indexes_int_mut:
1167            if random.random() < self.prob_mut_discrete:
1168                x[i] = self.discrete_mutation(x[i], *self.var_bounds[i])
1169
1170        for i in self.indexes_float_mut:                
1171            if random.random() < self.prob_mut:
1172                x[i] = self.real_mutation(x[i], *self.var_bounds[i])
1173            
1174        return x
1175
1176    def mut_middle(self, x: array1D, p1: array1D, p2: array1D):
1177        """
1178        mutation oriented on parents
1179        """
1180        for i in self.indexes_int_mut:
1181
1182            if random.random() < self.prob_mut_discrete:
1183                v1, v2 = p1[i], p2[i]
1184                if v1 < v2:
1185                    x[i] = random.randint(v1, v2)
1186                elif v1 > v2:
1187                    x[i] = random.randint(v2, v1)
1188                else:
1189                    x[i] = random.randint(*self.var_bounds[i])
1190                        
1191        for i in self.indexes_float_mut:                
1192            if random.random() < self.prob_mut:
1193                v1, v2 = p1[i], p2[i]
1194                if v1 != v2:
1195                    x[i] = random.uniform(v1, v2)
1196                else:
1197                    x[i] = random.uniform(*self.var_bounds[i])
1198        return x
1199
1200    #endregion
1201
1202    #region Set functions
1203
1204    @staticmethod
1205    def default_set_function(function_for_set: Callable[[array1D], float]):
1206        """
1207        simple function for creating set_function 
1208        function_for_set just applies to each row of population
1209        """
1210        def func(matrix: array2D):
1211            return np.array(
1212                [function_for_set(row) for row in matrix]
1213            )
1214        return func
1215
1216    @staticmethod
1217    def vectorized_set_function(function_for_set: Callable[[array1D], float]):
1218        """
1219        works like default, but faster for big populations and slower for little
1220        function_for_set just applyes to each row of population
1221        """
1222
1223        func = np.vectorize(function_for_set, signature='(n)->()')
1224        return func
1225
1226    @staticmethod
1227    def set_function_multiprocess(function_for_set: Callable[[array1D], float], n_jobs: int = -1):
1228        """
1229        like function_for_set but uses joblib with n_jobs (-1 goes to count of available processors)
1230        """
1231        try:
1232            from joblib import Parallel, delayed
1233        except ModuleNotFoundError:
1234            raise ModuleNotFoundError(
1235                "this additional feature requires joblib package," 
1236                "run `pip install joblib` or `pip install --upgrade geneticalgorithm2[full]`" 
1237                "or use another set function"
1238            )
1239
1240        def func(matrix: array2D):
1241            result = Parallel(n_jobs=n_jobs)(delayed(function_for_set)(matrix[i]) for i in range(matrix.shape[0]))
1242            return np.array(result)
1243        return func
1244            
1245    #endregion

Genetic algorithm optimization process

GeneticAlgorithm2( function: Callable[[numpy.ndarray], float] = None, dimension: int = 0, variable_type: Union[Literal['int', 'real', 'bool'], Sequence[Literal['int', 'real', 'bool']]] = 'bool', variable_boundaries: Union[numpy.ndarray, Sequence[Tuple[float, float]], NoneType] = None, variable_type_mixed=None, function_timeout: Union[float, NoneType] = None, algorithm_parameters: Union[geneticalgorithm2.data_types.algorithm_params.AlgorithmParams, Dict[str, Any]] = AlgorithmParams(max_num_iteration=None, max_iteration_without_improv=None, population_size=100, mutation_probability=0.1, mutation_discrete_probability=None, crossover_probability=None, elit_ratio=0.04, parents_portion=0.3, crossover_type='uniform', mutation_type='uniform_by_center', mutation_discrete_type='uniform_discrete', selection_type='roulette'))
 76    def __init__(
 77        self,
 78        function: FunctionToMinimize = None,
 79
 80        dimension: int = 0,
 81        variable_type: Union[VARIABLE_TYPE, Sequence[VARIABLE_TYPE]] = 'bool',
 82        variable_boundaries: Optional[Union[array2D, Sequence[Tuple[float, float]]]] = None,
 83
 84        variable_type_mixed=None,
 85
 86        function_timeout: Optional[float] = None,
 87        algorithm_parameters: Union[AlgorithmParams, Dict[str, Any]] = default_params
 88    ):
 89        """
 90        initializes the GA object and performs main checks
 91
 92        Args:
 93            function: the given objective function to be minimized -- deprecated and moved to run() method
 94            dimension: the number of decision variables, the population samples dimension
 95
 96            variable_type: string means the variable type for all variables,
 97                for mixed types use sequence of strings of type for each variable
 98
 99            variable_boundaries: leave it None if variable_type is 'bool';
100                otherwise provide a sequence of tuples of length two as boundaries for each variable;
101                the length of the array must be equal dimension.
102                For example, ([0,100], [0,200]) determines
103                    lower boundary 0 and upper boundary 100 for first
104                    and upper boundary 200 for second variable
105                    and dimension must be 2.
106
107            variable_type_mixed -- deprecated
108
109            function_timeout: if the given function does not provide
110                output before function_timeout (unit is seconds) the algorithm raises error.
111                For example, when there is an infinite loop in the given function.
112                `None` means disabling -- deprecated and moved to run()
113
114            algorithm_parameters: AlgorithmParams object or usual dictionary with algorithm parameter;
115                it is not mandatory to provide all possible parameters
116
117        Notes:
118            - This implementation minimizes the given objective function.
119            For maximization u can multiply the function by -1 (for instance): the absolute
120                value of the output would be the actual objective function
121
122        for more details and examples of implementation please visit:
123            https://github.com/PasaOpasen/geneticalgorithm2
124  
125        """
126
127        # all default fields
128
129        # self.crossover: Callable[[np.ndarray, np.ndarray], Tuple[np.ndarray, np.ndarray]] = None
130        # self.real_mutation: Callable[[float, float, float], float] = None
131        # self.discrete_mutation: Callable[[int, int, int], int] = None
132        # self.selection: Callable[[np.ndarray, int], np.ndarray] = None
133
134        self.result = None
135        self.best_function = None
136
137        self.revolution_oppositor = None
138        self.dup_oppositor = None
139        self.creator = None
140        self.init_oppositors = None
141
142        self.f: Callable[[array1D], float] = None
143        self.funtimeout: float = None
144
145        self.set_function: Callable[[np.ndarray], np.ndarray] = None
146
147        # self.dim: int = None
148        self.var_bounds: List[Tuple[Union[int, float], Union[int, float]]] = None
149        # self.indexes_int: np.ndarray = None
150        # self.indexes_float: np.ndarray = None
151
152        self.checked_reports: List[Tuple[str, Callable[[array1D], None]]] = None
153
154        self.population_size: int = None
155        self.progress_stream = None
156
157        # input algorithm's parameters
158
159        assert isinstance(algorithm_parameters, (dict, AlgorithmParams)), (
160            "algorithm_parameters must be dict or AlgorithmParams object"
161        )
162        if not isinstance(algorithm_parameters, AlgorithmParams):
163            algorithm_parameters = AlgorithmParams.from_dict(algorithm_parameters)
164        algorithm_parameters.validate()
165        self.param = algorithm_parameters
166
167        self.crossover, self.real_mutation, self.discrete_mutation, self.selection = algorithm_parameters.get_CMS_funcs()
168
169        # dimension and area bounds
170        self.dim = int(dimension)
171        assert self.dim > 0, f"dimension count must be int and >0, gotten {dimension}"
172
173        if variable_type_mixed is not None:
174            warnings.warn(
175                f"argument variable_type_mixed is deprecated and will be removed at version 7\n "
176                f"use variable_type={tuple(variable_type_mixed)} instead"
177            )
178            variable_type = variable_type_mixed
179        self._set_types_indexes(variable_type)  # types indexes
180        self._set_var_boundaries(variable_type, variable_boundaries)  # input variables' boundaries
181
182        # fix mutation probs
183
184        assert can_be_prob(self.param.mutation_probability)
185        self.prob_mut = self.param.mutation_probability
186        assert self.param.mutation_discrete_probability is None or can_be_prob(self.param.mutation_discrete_probability)
187        self.prob_mut_discrete = self.param.mutation_discrete_probability or self.prob_mut
188
189        if self.param.crossover_probability is not None:
190            warnings.warn(
191                f"crossover_probability is deprecated and will be removed in version 7. "
192                f"Reason: it's old and has no sense"
193            )
194
195        #############################################################
196        # input function
197        if function:
198            warnings.warn(
199                f"function is deprecated in init constructor and will be removed in version 7. "
200                f"Move this argument to run() method"
201            )
202            self._check_function(function)
203        if function_timeout:
204            warnings.warn(
205                f"function_timeout is deprecated in init constructor and will be removed in version 7. "
206                f"Move this argument to run() method"
207            )
208            self._check_function_timeout(function_timeout)
209
210        #############################################################
211        
212        self.population_size = int(self.param.population_size)
213        self._set_parents_count(self.param.parents_portion)
214        self._set_elit_count(self.population_size, self.param.elit_ratio)
215        assert self.parents_count >= self.elit_count, (
216            f"\n number of parents ({self.parents_count}) "
217            f"must be greater than number of elits ({self.elit_count})"
218        )
219
220        self._set_max_iterations()
221
222        self._set_report()
223
224        # specify this function to speed up or change default behaviour
225        self.fill_children: Optional[Callable[[array2D, int], None]] = None
226        """
227        custom function which adds children for population POP 
228            where POP[:parents_count] are parents lines and next lines are for children
229        """

initializes the GA object and performs main checks

Arguments:
  • function: the given objective function to be minimized -- deprecated and moved to run() method
  • dimension: the number of decision variables, the population samples dimension
  • variable_type: string means the variable type for all variables, for mixed types use sequence of strings of type for each variable
  • variable_boundaries: leave it None if variable_type is 'bool'; otherwise provide a sequence of tuples of length two as boundaries for each variable; the length of the array must be equal dimension. For example, ([0,100], [0,200]) determines lower boundary 0 and upper boundary 100 for first and upper boundary 200 for second variable and dimension must be 2.
  • variable_type_mixed -- deprecated
  • function_timeout: if the given function does not provide output before function_timeout (unit is seconds) the algorithm raises error. For example, when there is an infinite loop in the given function. None means disabling -- deprecated and moved to run()
  • algorithm_parameters: AlgorithmParams object or usual dictionary with algorithm parameter; it is not mandatory to provide all possible parameters
Notes:
  • This implementation minimizes the given objective function. For maximization u can multiply the function by -1 (for instance): the absolute value of the output would be the actual objective function

for more details and examples of implementation please visit: https://github.com/PasaOpasen/geneticalgorithm2

default_params = AlgorithmParams(max_num_iteration=None, max_iteration_without_improv=None, population_size=100, mutation_probability=0.1, mutation_discrete_probability=None, crossover_probability=None, elit_ratio=0.04, parents_portion=0.3, crossover_type='uniform', mutation_type='uniform_by_center', mutation_discrete_type='uniform_discrete', selection_type='roulette')
PROGRESS_BAR_LEN = 20

max count of symbols in the progress bar

output_dict
61    @property
62    def output_dict(self):
63        warnings.warn(
64            "'output_dict' is deprecated and will be removed at version 7 \n"
65            "use 'result' instead"
66        )
67        return self.result
needs_mutation: bool
69    @property
70    def needs_mutation(self) -> bool:
71        """whether the mutation is required"""
72        return self.prob_mut > 0 or self.prob_mut_discrete > 0

whether the mutation is required

result
best_function
revolution_oppositor
dup_oppositor
creator
init_oppositors
f: Callable[[numpy.ndarray], float]
funtimeout: float
set_function: Callable[[numpy.ndarray], numpy.ndarray]
var_bounds: List[Tuple[Union[int, float], Union[int, float]]]
checked_reports: List[Tuple[str, Callable[[numpy.ndarray], NoneType]]]
population_size: int
progress_stream
param
dim
prob_mut
prob_mut_discrete
fill_children: Union[Callable[[numpy.ndarray, int], NoneType], NoneType]

custom function which adds children for population POP where POP[:parents_count] are parents lines and next lines are for children

def run( self, no_plot: bool = False, disable_printing: bool = False, progress_bar_stream: Union[str, NoneType] = 'stdout', disable_progress_bar: bool = False, function: Callable[[numpy.ndarray], float] = None, function_timeout: Union[float, NoneType] = None, set_function: Callable[[numpy.ndarray], numpy.ndarray] = None, apply_function_to_parents: bool = False, start_generation: Union[geneticalgorithm2.data_types.generation.Generation, str, Dict[Literal['population', 'scores'], numpy.ndarray], numpy.ndarray, Tuple[Union[numpy.ndarray, NoneType], Union[numpy.ndarray, NoneType]]] = Generation(variables=None, scores=None), studEA: bool = False, mutation_indexes: Union[Iterable[int], NoneType] = None, init_creator: Union[Callable[[], numpy.ndarray], NoneType] = None, init_oppositors: Union[Sequence[Callable[[numpy.ndarray], numpy.ndarray]], NoneType] = None, duplicates_oppositor: Union[Callable[[numpy.ndarray], numpy.ndarray], NoneType] = None, remove_duplicates_generation_step: Union[int, NoneType] = None, revolution_oppositor: Union[Callable[[numpy.ndarray], numpy.ndarray], NoneType] = None, revolution_after_stagnation_step: Union[int, NoneType] = None, revolution_part: float = 0.3, population_initializer: Tuple[int, Callable[[numpy.ndarray, numpy.ndarray], Tuple[numpy.ndarray, numpy.ndarray]]] = (1, <function get_population_initializer.<locals>.process_population>), stop_when_reached: Union[float, NoneType] = None, callbacks: Union[Sequence[Callable[[int, List[float], numpy.ndarray, numpy.ndarray], NoneType]], NoneType] = None, middle_callbacks: Union[Sequence[Callable[[geneticalgorithm2.callbacks.data.MiddleCallbackData], Tuple[geneticalgorithm2.callbacks.data.MiddleCallbackData, bool]]], NoneType] = None, time_limit_secs: Union[float, NoneType] = None, save_last_generation_as: Union[str, NoneType] = None, seed: Union[int, NoneType] = None) -> geneticalgorithm2.data_types.result.GAResult:
 445    def run(
 446        self,
 447        no_plot: bool = False,
 448        disable_printing: bool = False,
 449        progress_bar_stream: Optional[str] = 'stdout',
 450
 451        # deprecated
 452        disable_progress_bar: bool = False,
 453
 454        function: FunctionToMinimize = None,
 455        function_timeout: Optional[float] = None,
 456
 457        set_function: SetFunctionToMinimize = None,
 458        apply_function_to_parents: bool = False,
 459        start_generation: GenerationConvertible = Generation(),
 460        studEA: bool = False,
 461        mutation_indexes: Optional[Iterable[int]] = None,
 462
 463        init_creator: Optional[CreatorFunc] = None,
 464        init_oppositors: Optional[Sequence[OppositorFunc]] = None,
 465
 466        duplicates_oppositor: Optional[OppositorFunc] = None,
 467        remove_duplicates_generation_step: Optional[int] = None,
 468
 469        revolution_oppositor: Optional[OppositorFunc] = None,
 470        revolution_after_stagnation_step: Optional[int] = None,
 471        revolution_part: float = 0.3,
 472
 473        population_initializer: Tuple[
 474            int, PopulationModifier
 475        ] = get_population_initializer(select_best_of=1, local_optimization_step='never', local_optimizer=None),
 476
 477        stop_when_reached: Optional[float] = None,
 478        callbacks: Optional[Sequence[SimpleCallbackFunc]] = None,
 479        middle_callbacks: Optional[Sequence[MiddleCallbackFunc]] = None,  #+
 480        time_limit_secs: Optional[float] = None,
 481        save_last_generation_as: Optional[str] = None,
 482        seed: Optional[int] = None
 483    ) -> GAResult:
 484        """
 485        runs optimization process
 486
 487        Args:
 488            no_plot: do not plot results using matplotlib by default
 489
 490            disable_printing: do not print log info of optimization process (except progress bar)
 491
 492            progress_bar_stream: 'stdout', 'stderr' or None to disable progress bar;
 493                disabling progress bar can speed up the optimization process in many cases
 494
 495            disable_progress_bar: deprecated
 496
 497            function: the given objective function (sample -> its score) to be minimized;
 498
 499            function_timeout: if the given function does not provide
 500                output before function_timeout (unit is seconds) the algorithm raises error.
 501                For example, when there is an infinite loop in the given function.
 502                `None` means disabling
 503
 504            set_function: set function (all samples -> score per sample) to be used instead of usual function
 505                (usually for optimization purposes)
 506
 507            apply_function_to_parents: whether to apply function to parents from previous generation (if it's needed), 
 508                it can be needed at working with games agents, but for other tasks will just waste time
 509
 510            start_generation: initial generation object of any `GenerationConvertible` type
 511
 512            studEA: using stud EA strategy (crossover with best object always)
 513
 514            mutation_indexes: indexes of dimensions where mutation can be performed (all dimensions by default)
 515
 516            init_creator: the function creates population samples.
 517                By default -- random uniform for real variables and random uniform for int
 518            init_oppositors: the list of oppositors creates oppositions for base population. No by default
 519
 520            duplicates_oppositor: oppositor for applying after duplicates removing.
 521                By default -- using just random initializer from creator
 522            remove_duplicates_generation_step: step for removing duplicates (have a sense with discrete tasks).
 523                No by default
 524
 525            revolution_oppositor: oppositor for revolution time. No by default
 526            revolution_after_stagnation_step: create revolution after this generations of stagnation. No by default
 527            revolution_part: float, the part of generation to being oppose. By default is 0.3
 528
 529            population_initializer: object for actions at population initialization step
 530                to create better start population. See doc
 531
 532            stop_when_reached: stop searching after reaching this value 
 533                (it can be potential minimum or something else)
 534
 535            callbacks: sequence of callback functions with structure:
 536                (generation_number, report_list, last_population, last_scores) -> do some action
 537
 538            middle_callbacks: sequence of functions made by `MiddleCallback` class
 539
 540            time_limit_secs: limit time of working (in seconds);
 541                `None` means no time limit (limit will be only for count of generation and so on)
 542
 543            save_last_generation_as: path to .npz file 
 544                for saving last_generation as numpy dictionary like
 545                {'population': 2D-array, 'scores': 1D-array}; 
 546                None disables this option
 547
 548            seed: random seed (None if doesn't matter)
 549
 550        Returns:
 551            `GAResult` object;
 552            also fills the self.report and self.result with many report information
 553
 554        Notes:
 555            - if `function_timeout` is enabled then `function` must be set
 556            - it would be more logical to use params like `studEA` as an algorithm parameter, 
 557                but `run()`-way can be more convenient for real using
 558        """
 559
 560        if disable_progress_bar:
 561            warnings.warn(
 562                f"disable_progress_bar is deprecated and will be removed in version 7, "
 563                f"use probress_bar_stream=None to disable progress bar"
 564            )
 565            progress_bar_stream = None
 566
 567        enable_printing: bool = not disable_printing
 568
 569        start_generation = Generation.from_object(self.dim, start_generation)
 570
 571        assert is_current_gen_number(revolution_after_stagnation_step), "must be None or int and >0"
 572        assert is_current_gen_number(remove_duplicates_generation_step), "must be None or int and >0"
 573        assert can_be_prob(revolution_part), f"revolution_part must be in [0,1], not {revolution_part}"
 574        assert stop_when_reached is None or isinstance(stop_when_reached, (int, float))
 575        assert isinstance(callbacks, collections.abc.Sequence) or callbacks is None, (
 576            "callbacks should be a list of callbacks functions"
 577        )
 578        assert isinstance(middle_callbacks, collections.abc.Sequence) or middle_callbacks is None, (
 579            "middle_callbacks should be list of MiddleCallbacks functions"
 580        )
 581        assert time_limit_secs is None or time_limit_secs > 0, 'time_limit_secs must be None of number > 0'
 582
 583        self._set_mutation_indexes(mutation_indexes)
 584        from OppOpPopInit import set_seed
 585        set_seed(seed)
 586
 587        # randomstate = np.random.default_rng(random.randint(0, 1000) if seed is None else seed)
 588        # self.randomstate = randomstate
 589
 590        # using bool flag is faster than using empty function with generated string arguments
 591        SHOW_PROGRESS = progress_bar_stream is not None
 592        if SHOW_PROGRESS:
 593
 594            show_progress = lambda t, t2, s: self._progress(t, t2, status=s)
 595
 596            if progress_bar_stream == 'stdout':
 597                self.progress_stream = sys.stdout
 598            elif progress_bar_stream == 'stderr':
 599                self.progress_stream = sys.stderr
 600            else:
 601                raise Exception(
 602                    f"wrong value {progress_bar_stream} of progress_bar_stream, must be 'stdout'/'stderr'/None"
 603                )
 604        else:
 605            show_progress = None
 606
 607        stop_by_val = (
 608            (lambda best_f: False)
 609            if stop_when_reached is None
 610            else (lambda best_f: best_f <= stop_when_reached)
 611        )
 612
 613        t: int = 0
 614        count_stagnation: int = 0
 615        pop: array2D = None
 616        scores: array1D = None
 617
 618        #region CALLBACKS
 619
 620        def get_data():
 621            """
 622            returns all important data about model
 623            """
 624            return MiddleCallbackData(
 625                last_generation=Generation(pop, scores),
 626                current_generation=t,
 627                report_list=self.report,
 628
 629                mutation_prob=self.prob_mut,
 630                mutation_discrete_prob=self.prob_mut_discrete,
 631                mutation=self.real_mutation,
 632                mutation_discrete=self.discrete_mutation,
 633                crossover=self.crossover,
 634                selection=self.selection,
 635
 636                current_stagnation=count_stagnation,
 637                max_stagnation=self.max_stagnations,
 638
 639                parents_portion=self.param.parents_portion,
 640                elit_ratio=self.param.elit_ratio,
 641
 642                set_function=self.set_function,
 643
 644                reason_to_stop=reason_to_stop
 645            )
 646
 647        def set_data(data: MiddleCallbackData):
 648            """
 649            sets data to model
 650            """
 651            nonlocal pop, scores, count_stagnation, reason_to_stop
 652
 653            pop, scores = data.last_generation.variables, data.last_generation.scores
 654            self.population_size = pop.shape[0]
 655
 656            self.param.parents_portion = data.parents_portion
 657            self._set_parents_count(data.parents_portion)
 658
 659            self.param.elit_ratio = data.elit_ratio
 660            self._set_elit_count(self.population_size, data.elit_ratio)
 661
 662            self.prob_mut = data.mutation_prob
 663            self.prob_mut_discrete = data.mutation_discrete_prob
 664            self.real_mutation = data.mutation
 665            self.discrete_mutation = data.mutation_discrete
 666            self.crossover = data.crossover
 667            self.selection = data.selection
 668
 669            count_stagnation = data.current_stagnation
 670            reason_to_stop = data.reason_to_stop
 671            self.max_stagnations = data.max_stagnation
 672
 673            self.set_function = data.set_function
 674
 675        if not callbacks:
 676            total_callback = lambda g, r, lp, ls: None
 677        else:
 678            def total_callback(
 679                generation_number: int,
 680                report_list: List[float],
 681                last_population: array2D,
 682                last_scores: array1D
 683            ):
 684                for cb in callbacks:
 685                    cb(generation_number, report_list, last_population, last_scores)
 686
 687        if not middle_callbacks:
 688            total_middle_callback = lambda: None
 689        else:
 690            def total_middle_callback():
 691                """
 692                applies callbacks and sets new data if there is a sense
 693                """
 694                data = get_data()
 695                flag = False
 696                for cb in middle_callbacks:
 697                    data, has_sense = cb(data)
 698                    if has_sense:
 699                        flag = True
 700                if flag:
 701                    set_data(data)  # update global date if there was real callback step
 702
 703        #endregion
 704
 705        start_time = time.time()
 706        time_is_done = (
 707            (lambda: False)
 708            if time_limit_secs is None
 709            else (lambda: int(time.time() - start_time) >= time_limit_secs)
 710        )
 711
 712        # combine with deprecated parts
 713        function = function or self.f
 714        function_timeout = function_timeout or self.funtimeout
 715
 716        assert function or set_function, 'no function to minimize'
 717        if function:
 718            self._check_function(function)
 719        if function_timeout:
 720            self._check_function_timeout(function_timeout)
 721
 722        self.set_function = set_function or GeneticAlgorithm2.default_set_function(self.f)
 723
 724        #region Initial population, duplicates filter, revolutionary
 725
 726        pop_coef, initializer_func = population_initializer
 727        
 728        # population creator by random or with oppositions
 729        if init_creator is None:
 730
 731            from OppOpPopInit import SampleInitializers
 732
 733            # just uniform random
 734            self.creator = SampleInitializers.Combined(
 735                minimums=[v[0] for v in self.var_bounds],
 736                maximums=[v[1] for v in self.var_bounds],
 737                indexes=(
 738                    self.indexes_int,
 739                    self.indexes_float
 740                ),
 741                creator_initializers=(
 742                    SampleInitializers.RandomInteger,
 743                    SampleInitializers.Uniform
 744                )
 745            )
 746        else:
 747            assert callable(init_creator)
 748            self.creator = init_creator
 749
 750        self.init_oppositors = init_oppositors
 751        self.dup_oppositor = duplicates_oppositor
 752        self.revolution_oppositor = revolution_oppositor
 753
 754        # event for removing duplicates
 755        if remove_duplicates_generation_step is None:
 756            def remover(pop: array2D, scores: array1D, gen: int) -> Tuple[
 757                array2D,
 758                array1D
 759            ]:
 760                return pop, scores
 761        else:
 762            
 763            def without_dup(pop: array2D, scores: array1D):  # returns population without dups
 764                _, index_of_dups = np.unique(pop, axis=0, return_index=True)
 765                return pop[index_of_dups], scores[index_of_dups], scores.size - index_of_dups.size
 766            
 767            if self.dup_oppositor is None:  # if there is no dup_oppositor, use random creator
 768                def remover(pop: array2D, scores: array1D, gen: int) -> Tuple[
 769                    array2D,
 770                    array1D
 771                ]:
 772                    if gen % remove_duplicates_generation_step != 0:
 773                        return pop, scores
 774
 775                    pp, sc, count_to_create = without_dup(pop, scores)  # pop without dups
 776                    
 777                    if count_to_create == 0:
 778                        if SHOW_PROGRESS:
 779                            show_progress(t, self.max_iterations,
 780                                      f"GA is running...{t} gen from {self.max_iterations}. No dups!")
 781                        return pop, scores
 782
 783                    pp2 = SampleInitializers.CreateSamples(self.creator, count_to_create)  # new pop elements
 784                    pp2_scores = self.set_function(pp2)  # new elements values
 785
 786                    if SHOW_PROGRESS:
 787                        show_progress(t, self.max_iterations,
 788                                      f"GA is running...{t} gen from {self.max_iterations}. Kill dups!")
 789                    
 790                    new_pop = np.vstack((pp, pp2))
 791                    new_scores = np.concatenate((sc, pp2_scores))
 792
 793                    args_to_sort = new_scores.argsort()
 794                    return new_pop[args_to_sort], new_scores[args_to_sort]
 795
 796            else:  # using oppositors
 797                assert callable(self.dup_oppositor)
 798
 799                def remover(pop: np.ndarray, scores: np.ndarray, gen: int) -> Tuple[
 800                    np.ndarray,
 801                    np.ndarray
 802                ]:
 803                    if gen % remove_duplicates_generation_step != 0:
 804                        return pop, scores
 805
 806                    pp, sc, count_to_create = without_dup(pop, scores)  # pop without dups
 807
 808                    if count_to_create == 0:
 809                        if SHOW_PROGRESS:
 810                            show_progress(t, self.max_iterations,
 811                                          f"GA is running...{t} gen from {self.max_iterations}. No dups!")
 812                        return pop, scores
 813
 814                    if count_to_create > sc.size:
 815                        raise Exception(
 816                            f"Too many duplicates at generation {gen} ({count_to_create} > {sc.size}), cannot oppose"
 817                        )
 818
 819                    # oppose count_to_create worse elements
 820                    pp2 = OppositionOperators.Reflect(pp[-count_to_create:], self.dup_oppositor)  # new pop elements
 821                    pp2_scores = self.set_function(pp2)  # new elements values
 822
 823                    if SHOW_PROGRESS:
 824                        show_progress(t, self.max_iterations,
 825                                      f"GA is running...{t} gen from {self.max_iterations}. Kill dups!")
 826
 827                    new_pop = np.vstack((pp, pp2))
 828                    new_scores = np.concatenate((sc, pp2_scores))
 829
 830                    args_to_sort = new_scores.argsort()
 831                    return new_pop[args_to_sort], new_scores[args_to_sort]
 832
 833        # event for revolution
 834        if revolution_after_stagnation_step is None:
 835            def revolution(pop: array2D, scores: array1D, stagnation_count: int) -> Tuple[
 836                array2D,
 837                array1D
 838            ]:
 839                return pop, scores
 840        else:
 841            if revolution_oppositor is None:
 842                raise Exception(
 843                    f"How can I make revolution each {revolution_after_stagnation_step} stagnation steps "
 844                    f"if revolution_oppositor is None (not defined)?"
 845                )
 846            assert callable(revolution_oppositor)
 847
 848            from OppOpPopInit import OppositionOperators
 849            
 850            def revolution(pop: array2D, scores: array1D, stagnation_count: int) -> Tuple[
 851                array2D,
 852                array1D
 853            ]:
 854                if stagnation_count < revolution_after_stagnation_step:
 855                    return pop, scores
 856                part = int(pop.shape[0]*revolution_part)
 857                
 858                pp2 = OppositionOperators.Reflect(pop[-part:], self.revolution_oppositor)
 859                pp2_scores = self.set_function(pp2)
 860
 861                combined = np.vstack((pop, pp2))
 862                combined_scores = np.concatenate((scores, pp2_scores))
 863                args = combined_scores.argsort()
 864
 865                if SHOW_PROGRESS:
 866                    show_progress(t, self.max_iterations,
 867                                  f"GA is running...{t} gen from {self.max_iterations}. Revolution!")
 868
 869                args = args[:scores.size]
 870                return combined[args], combined_scores[args]
 871
 872        #enregion
 873
 874        #
 875        #
 876        #  START ALGORITHM LOGIC
 877        #
 878        #
 879
 880        # Report
 881        self._clear_report()  # clear old report objects
 882        self._init_report()
 883
 884        # initialization of pop
 885        
 886        if start_generation.variables is None:
 887
 888            from OppOpPopInit import init_population
 889
 890            real_pop_size = self.population_size * pop_coef
 891
 892            # pop = np.empty((real_pop_size, self.dim))
 893            scores = np.empty(real_pop_size)
 894            
 895            pop = init_population(
 896                samples_count=real_pop_size,
 897                creator=self.creator,
 898                oppositors=self.init_oppositors
 899            )
 900
 901            if self.funtimeout and self.funtimeout > 0:  # perform simulation
 902            
 903                time_counter = 0
 904
 905                for p in range(0, real_pop_size):
 906                    # simulation returns exception or func value -- check the time of evaluating
 907                    value, eval_time = self._simulate(pop[p])
 908                    scores[p] = value
 909                    time_counter += eval_time
 910
 911                if enable_printing:
 912                    print(
 913                        f"\nSim: Average time of function evaluating (secs): "
 914                        f"{time_counter/real_pop_size} (total = {time_counter})\n"
 915                    )
 916            else:
 917
 918                eval_time = time.time()
 919                scores = self.set_function(pop)
 920                eval_time = time.time() - eval_time
 921                if enable_printing:
 922                    print(
 923                        f"\nSet: Average time of function evaluating (secs): "
 924                        f"{eval_time/real_pop_size} (total = {eval_time})\n"
 925                    )
 926                
 927        else:
 928
 929            self.population_size = start_generation.variables.shape[0]
 930            self._set_elit_count(self.population_size, self.param.elit_ratio)
 931            self._set_parents_count(self.param.parents_portion)
 932
 933            pop = start_generation.variables
 934
 935            if start_generation.scores is None:
 936
 937                _time = time.time()
 938                scores = self.set_function(pop)
 939                _time = time.time() - _time
 940
 941                if enable_printing:
 942                    print(
 943                        f'\nFirst scores are made from gotten variables '
 944                        f'(by {_time} secs, about {_time/scores.size} for each creature)\n'
 945                    )
 946            else:
 947                scores = start_generation.scores
 948                if enable_printing:
 949                    print(f"\nFirst scores are from gotten population\n")
 950
 951        # Initialization by select bests and local_descent
 952        
 953        pop, scores = initializer_func(pop, scores)
 954
 955        # first sort
 956        args_to_sort = scores.argsort()
 957        pop = pop[args_to_sort]
 958        scores = scores[args_to_sort]
 959        self._update_report(scores)
 960
 961        self.population_size = scores.size
 962        self.best_function = scores[0]
 963
 964        if enable_printing:
 965            print(
 966                f"Best score before optimization: {self.best_function}"
 967            )
 968
 969        t: int = 1
 970        count_stagnation: int = 0
 971        """iterations without progress"""
 972        reason_to_stop: Optional[str] = None
 973
 974        # gets indexes of 2 parents to crossover
 975        if studEA:
 976            get_parents_inds = lambda: (0, random.randrange(1, self.parents_count))
 977        else:
 978            get_parents_inds = lambda: random_indexes_pair(self.parents_count)
 979
 980        #  while no reason to stop
 981        while True:
 982
 983            if count_stagnation > self.max_stagnations:
 984                reason_to_stop = f"limit of fails: {count_stagnation}"
 985            elif t == self.max_iterations:
 986                reason_to_stop = f'limit of iterations: {t}'
 987            elif stop_by_val(self.best_function):
 988                reason_to_stop = f"stop value reached: {self.best_function} <= {stop_when_reached}"
 989            elif time_is_done():
 990                reason_to_stop = f'time is done: {time.time() - start_time} >= {time_limit_secs}'
 991
 992            if reason_to_stop is not None:
 993                if SHOW_PROGRESS:
 994                    show_progress(t, self.max_iterations, f"GA is running... STOP! {reason_to_stop}")
 995                break
 996
 997            if scores[0] < self.best_function:  # if there is progress
 998                count_stagnation = 0
 999                self.best_function = scores[0]
1000            else:
1001                count_stagnation += 1
1002
1003            if SHOW_PROGRESS:
1004                show_progress(
1005                    t,
1006                    self.max_iterations,
1007                    f"GA is running...{t} gen from {self.max_iterations}...best value = {self.best_function}"
1008                )
1009
1010            # Select parents
1011            
1012            par: array2D = np.empty((self.parents_count, self.dim))
1013            """samples chosen to create new samples"""
1014            par_scores: array1D = np.empty(self.parents_count)
1015
1016            elit_slice = slice(None, self.elit_count)
1017            """elit parents"""
1018
1019            # copy needs because the generation will be removed after parents selection
1020            par[elit_slice] = pop[elit_slice].copy()
1021            par_scores[elit_slice] = scores[elit_slice].copy()
1022
1023            new_par_inds = (
1024                self.selection(
1025                    scores[self.elit_count:],
1026                    self.parents_count - self.elit_count
1027                ).astype(np.int16) + self.elit_count
1028            )
1029            """non-elit parents indexes"""
1030            #new_par_inds = self.selection(scores, self.parents_count - self.elit_count).astype(np.int16)
1031            par_slice = slice(self.elit_count, self.parents_count)
1032            par[par_slice] = pop[new_par_inds].copy()
1033            par_scores[par_slice] = scores[new_par_inds].copy()
1034
1035            pop = np.empty((self.population_size, self.dim))
1036            """new generation"""
1037            scores = np.empty(self.population_size)
1038            """new generation scores"""
1039
1040            parents_slice = slice(None, self.parents_count)
1041            pop[parents_slice] = par
1042            scores[parents_slice] = par_scores
1043
1044            if self.fill_children is None:  # default fill children behaviour
1045                DO_MUTATION = self.needs_mutation
1046                for k in range(self.parents_count, self.population_size, 2):
1047
1048                    r1, r2 = get_parents_inds()
1049
1050                    pvar1 = pop[r1]  # equal to par[r1], but better for cache
1051                    pvar2 = pop[r2]
1052
1053                    ch1, ch2 = self.crossover(pvar1, pvar2)
1054
1055                    if DO_MUTATION:
1056                        ch1 = self.mut(ch1)
1057                        ch2 = self.mut_middle(ch2, pvar1, pvar2)
1058
1059                    pop[k] = ch1
1060                    pop[k+1] = ch2
1061            else:  # custom behaviour
1062                self.fill_children(pop, self.parents_count)
1063
1064            if apply_function_to_parents:
1065                scores = self.set_function(pop)
1066            else:
1067                scores[self.parents_count:] = self.set_function(pop[self.parents_count:])
1068            
1069            # remove duplicates
1070            pop, scores = remover(pop, scores, t)
1071            # revolution
1072            pop, scores = revolution(pop, scores, count_stagnation)
1073
1074            # make population better
1075            args_to_sort = scores.argsort()
1076            pop = pop[args_to_sort]
1077            scores = scores[args_to_sort]
1078            self._update_report(scores)
1079
1080            # callback it
1081            total_callback(t, self.report, pop, scores)
1082            total_middle_callback()
1083
1084            t += 1
1085
1086        self.best_function = fast_min(scores[0], self.best_function)
1087
1088        last_generation = Generation(pop, scores)
1089        self.result = GAResult(last_generation)
1090
1091        if save_last_generation_as is not None:
1092            last_generation.save(save_last_generation_as)
1093
1094        if enable_printing:
1095            show = ' ' * 200
1096            sys.stdout.write(
1097                f'\r{show}\n'
1098                f'\r The best found solution:\n {pop[0]}'
1099                f'\n\n Objective function:\n {self.best_function}\n'
1100                f'\n Used generations: {len(self.report)}'
1101                f'\n Used time: {time.time() - start_time:.3g} seconds\n'
1102            )
1103            sys.stdout.flush() 
1104        
1105        if not no_plot:
1106            self.plot_results()
1107
1108        if enable_printing:
1109            if reason_to_stop is not None and 'iterations' not in reason_to_stop:
1110                sys.stdout.write(
1111                    f'\nWarning: GA is terminated because of {reason_to_stop}'
1112                )
1113
1114        return self.result

runs optimization process

Arguments:
  • no_plot: do not plot results using matplotlib by default
  • disable_printing: do not print log info of optimization process (except progress bar)
  • progress_bar_stream: 'stdout', 'stderr' or None to disable progress bar; disabling progress bar can speed up the optimization process in many cases
  • disable_progress_bar: deprecated
  • function: the given objective function (sample -> its score) to be minimized;
  • function_timeout: if the given function does not provide output before function_timeout (unit is seconds) the algorithm raises error. For example, when there is an infinite loop in the given function. None means disabling
  • set_function: set function (all samples -> score per sample) to be used instead of usual function (usually for optimization purposes)
  • apply_function_to_parents: whether to apply function to parents from previous generation (if it's needed), it can be needed at working with games agents, but for other tasks will just waste time
  • start_generation: initial generation object of any GenerationConvertible type
  • studEA: using stud EA strategy (crossover with best object always)
  • mutation_indexes: indexes of dimensions where mutation can be performed (all dimensions by default)
  • init_creator: the function creates population samples. By default -- random uniform for real variables and random uniform for int
  • init_oppositors: the list of oppositors creates oppositions for base population. No by default
  • duplicates_oppositor: oppositor for applying after duplicates removing. By default -- using just random initializer from creator
  • remove_duplicates_generation_step: step for removing duplicates (have a sense with discrete tasks). No by default
  • revolution_oppositor: oppositor for revolution time. No by default
  • revolution_after_stagnation_step: create revolution after this generations of stagnation. No by default
  • revolution_part: float, the part of generation to being oppose. By default is 0.3
  • population_initializer: object for actions at population initialization step to create better start population. See doc
  • stop_when_reached: stop searching after reaching this value (it can be potential minimum or something else)
  • callbacks: sequence of callback functions with structure: (generation_number, report_list, last_population, last_scores) -> do some action
  • middle_callbacks: sequence of functions made by MiddleCallback class
  • time_limit_secs: limit time of working (in seconds); None means no time limit (limit will be only for count of generation and so on)
  • save_last_generation_as: path to .npz file for saving last_generation as numpy dictionary like {'population': 2D-array, 'scores': 1D-array}; None disables this option
  • seed: random seed (None if doesn't matter)
Returns:

GAResult object; also fills the self.report and self.result with many report information

Notes:
  • if function_timeout is enabled then function must be set
  • it would be more logical to use params like studEA as an algorithm parameter, but run()-way can be more convenient for real using
def plot_results( self, title: str = 'Genetic Algorithm', save_as: Union[str, NoneType] = None, dpi: int = 200, main_color: str = 'blue'):
1120    def plot_results(
1121        self,
1122        title: str = 'Genetic Algorithm',
1123        save_as: Optional[str] = None,
1124        dpi: int = 200,
1125        main_color: str = 'blue'
1126     ):
1127        """
1128        Simple plot of self.report (if not empty)
1129        """
1130        if len(self.report) == 0:
1131            sys.stdout.write("No results to plot!\n")
1132            return
1133
1134        plot_several_lines(
1135            lines=[self.report],
1136            colors=[main_color],
1137            labels=['best of generation'],
1138            linewidths=[2],
1139            title=title,
1140            xlabel='Generation',
1141            ylabel='Minimized function',
1142            save_as=save_as,
1143            dpi=dpi
1144        )

Simple plot of self.report (if not empty)

def plot_generation_scores( self, title: str = 'Last generation scores', save_as: Union[str, NoneType] = None):
1146    def plot_generation_scores(self, title: str = 'Last generation scores', save_as: Optional[str] = None):
1147        """
1148        Plots barplot of scores of last population
1149        """
1150
1151        if not hasattr(self, 'result'):
1152            raise Exception(
1153                "There is no 'result' field into ga object! Before plotting generation u need to run seaching process"
1154            )
1155
1156        plot_pop_scores(self.result.last_generation.scores, title, save_as)

Plots barplot of scores of last population

def mut(self, x: numpy.ndarray):
1161    def mut(self, x: array1D):
1162        """
1163        just mutation
1164        """
1165
1166        for i in self.indexes_int_mut:
1167            if random.random() < self.prob_mut_discrete:
1168                x[i] = self.discrete_mutation(x[i], *self.var_bounds[i])
1169
1170        for i in self.indexes_float_mut:                
1171            if random.random() < self.prob_mut:
1172                x[i] = self.real_mutation(x[i], *self.var_bounds[i])
1173            
1174        return x

just mutation

def mut_middle(self, x: numpy.ndarray, p1: numpy.ndarray, p2: numpy.ndarray):
1176    def mut_middle(self, x: array1D, p1: array1D, p2: array1D):
1177        """
1178        mutation oriented on parents
1179        """
1180        for i in self.indexes_int_mut:
1181
1182            if random.random() < self.prob_mut_discrete:
1183                v1, v2 = p1[i], p2[i]
1184                if v1 < v2:
1185                    x[i] = random.randint(v1, v2)
1186                elif v1 > v2:
1187                    x[i] = random.randint(v2, v1)
1188                else:
1189                    x[i] = random.randint(*self.var_bounds[i])
1190                        
1191        for i in self.indexes_float_mut:                
1192            if random.random() < self.prob_mut:
1193                v1, v2 = p1[i], p2[i]
1194                if v1 != v2:
1195                    x[i] = random.uniform(v1, v2)
1196                else:
1197                    x[i] = random.uniform(*self.var_bounds[i])
1198        return x

mutation oriented on parents

@staticmethod
def default_set_function(function_for_set: Callable[[numpy.ndarray], float]):
1204    @staticmethod
1205    def default_set_function(function_for_set: Callable[[array1D], float]):
1206        """
1207        simple function for creating set_function 
1208        function_for_set just applies to each row of population
1209        """
1210        def func(matrix: array2D):
1211            return np.array(
1212                [function_for_set(row) for row in matrix]
1213            )
1214        return func

simple function for creating set_function function_for_set just applies to each row of population

@staticmethod
def vectorized_set_function(function_for_set: Callable[[numpy.ndarray], float]):
1216    @staticmethod
1217    def vectorized_set_function(function_for_set: Callable[[array1D], float]):
1218        """
1219        works like default, but faster for big populations and slower for little
1220        function_for_set just applyes to each row of population
1221        """
1222
1223        func = np.vectorize(function_for_set, signature='(n)->()')
1224        return func

works like default, but faster for big populations and slower for little function_for_set just applyes to each row of population

@staticmethod
def set_function_multiprocess(function_for_set: Callable[[numpy.ndarray], float], n_jobs: int = -1):
1226    @staticmethod
1227    def set_function_multiprocess(function_for_set: Callable[[array1D], float], n_jobs: int = -1):
1228        """
1229        like function_for_set but uses joblib with n_jobs (-1 goes to count of available processors)
1230        """
1231        try:
1232            from joblib import Parallel, delayed
1233        except ModuleNotFoundError:
1234            raise ModuleNotFoundError(
1235                "this additional feature requires joblib package," 
1236                "run `pip install joblib` or `pip install --upgrade geneticalgorithm2[full]`" 
1237                "or use another set function"
1238            )
1239
1240        def func(matrix: array2D):
1241            result = Parallel(n_jobs=n_jobs)(delayed(function_for_set)(matrix[i]) for i in range(matrix.shape[0]))
1242            return np.array(result)
1243        return func

like function_for_set but uses joblib with n_jobs (-1 goes to count of available processors)