Skip to content

Commit

Permalink
Implementation of genome reorder function
Browse files Browse the repository at this point in the history
  • Loading branch information
HenrikMettler committed Nov 4, 2020
1 parent 14ab8aa commit 0f44a01
Show file tree
Hide file tree
Showing 7 changed files with 442 additions and 4 deletions.
15 changes: 14 additions & 1 deletion cgp/ea/mu_plus_lambda.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def __init__(
n_processes: int = 1,
local_search: Callable[[IndividualBase], None] = lambda combined: None,
k_local_search: Union[int, None] = None,
reorder_genome: bool = False,
):
"""Init function
Expand All @@ -44,13 +45,21 @@ def __init__(
k_local_search : int
Number of individuals in the whole population (parents +
offsprings) to apply local search to.
reorder_genome: bool, optional
Whether genome reordering should be applied.
Reorder shuffles the genotype of an individual without changing its phenotype,
thereby contributing to neutral drift through the genotypic search space.
If True, reorder is applied to each parents genome at every generation
before creating offsprings.
Defaults to True.
"""
self.n_offsprings = n_offsprings

self.tournament_size = tournament_size
self.n_processes = n_processes
self.local_search = local_search
self.k_local_search = k_local_search
self.reorder_genome = reorder_genome

def initialize_fitness_parents(
self, pop: Population, objective: Callable[[IndividualBase], IndividualBase]
Expand All @@ -71,7 +80,7 @@ def initialize_fitness_parents(
pop._parents = self._compute_fitness(pop.parents, objective)

def step(
self, pop: Population, objective: Callable[[IndividualBase], IndividualBase]
self, pop: Population, objective: Callable[[IndividualBase], IndividualBase],
) -> Population:
"""Perform one step in the evolution.
Expand All @@ -89,6 +98,10 @@ def step(
Population
Modified population with new parents.
"""

if self.reorder_genome:
pop.reorder_genome()

offsprings = self._create_new_offspring_generation(pop)

# we want to prefer offsprings with the same fitness as their
Expand Down
152 changes: 149 additions & 3 deletions cgp/genome.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Dict, Generator, List, Optional, Tuple, Type, Union
from typing import Dict, Generator, List, Optional, Set, Tuple, Type, Union

import numpy as np

Expand Down Expand Up @@ -138,7 +138,7 @@ def __repr__(self) -> str:
return s

def determine_permissible_values_per_gene(self, gene_idx: int) -> np.ndarray:
region_idx = gene_idx // self._length_per_region
region_idx = self._get_region_idx(gene_idx)

if self._is_input_region(region_idx):
return self._determine_permissible_values_input_region(gene_idx)
Expand Down Expand Up @@ -267,6 +267,147 @@ def randomize(self, rng: np.random.RandomState) -> None:
# accept generated dna if it is valid
self.dna = dna

def reorder(self, rng: np.random.RandomState) -> None:
"""Reorder the genome
Shuffle node ordering of internal (hidden) nodes in genome without changing the phenotype.
(Goldman 2015, DOI: 10.1109/TEVC.2014.2324539)
During reordering, inactive genes, e.g., input genes of nodes with arity zero,
are not taken into account and can hence have invalid values after reordering.
These invalid values are replaced by random values
for the respective gene after reordering.
Parameters
----------
rng : numpy.RandomState
Random number generator instance.
Returns
----------
None
"""
if (self._n_rows != 1) or (self._levels_back != self._n_columns):
raise ValueError(
"Genome reordering is only implemented for n_rows=1" " and levels_back=n_columns"
)

dna = self._dna.copy()

node_dependencies: Dict[int, Set[int]] = self._determine_node_dependencies()

addable_nodes: Set[int] = self._get_addable_nodes(node_dependencies)

new_node_idx: int = self._n_inputs # First position to be placed is after inputs
used_node_indices: List[int] = []

while len(addable_nodes) > 0:

old_node_idx = rng.choice(list(addable_nodes))

dna = self._copy_dna_segment(dna, old_node_idx=old_node_idx, new_node_idx=new_node_idx)

for dependencies in node_dependencies.values():
dependencies.discard(old_node_idx)

used_node_indices.append(old_node_idx)
addable_nodes = self._get_addable_nodes(node_dependencies, used_node_indices)
new_node_idx += 1

self._update_input_genes(dna, used_node_indices)
self._replace_invalid_input_alleles(dna, rng)

self.dna = dna

def _copy_dna_segment(self, dna: List[int], old_node_idx: int, new_node_idx: int) -> List[int]:
""" Copy a nodes dna segment from its old node location to a new location. """

dna[
new_node_idx * self._length_per_region : (new_node_idx + 1) * self._length_per_region
] = self._dna[
old_node_idx * self._length_per_region : (old_node_idx + 1) * self._length_per_region
]

return dna

def _update_input_genes(self, dna: List[int], used_node_indices: List[int]) -> None:
"""Update input genes of all nodes from old node indices to new node indices"""
for gene_idx, gene_value in enumerate(dna):
region_idx = self._get_region_idx(gene_idx)
if self._is_hidden_input_gene(gene_idx, region_idx) or self._is_output_input_gene(
gene_idx
):
if gene_value >= self._n_inputs:
gene_value = self._n_inputs + used_node_indices.index(gene_value)
dna[gene_idx] = gene_value

def _replace_invalid_input_alleles(self, dna: List[int], rng: np.random.RandomState) -> None:
"""Replace invalid alleles for unused input genes of all nodes
by random permissible values.
WARNING: Works only if self.n_rows==1.
"""
assert self._n_rows == 1

for gene_idx, gene_value in enumerate(dna):
region_idx = self._get_region_idx(gene_idx)
if self._is_hidden_input_gene(gene_idx, region_idx) and gene_value > region_idx:
permissible_values = self.determine_permissible_values_per_gene(gene_idx)
gene_value = rng.choice(permissible_values)
dna[gene_idx] = gene_value

def _get_addable_nodes(
self, node_dependencies: Dict[int, Set[int]], used_node_indices: List[int] = [],
) -> Set[int]:
""" Get the set of addable nodes,
nodes which have no dependencies and were not already used.
"""
addable_nodes = set(
idx for idx, dependencies in node_dependencies.items() if len(dependencies) == 0
)
return addable_nodes.difference(used_node_indices)

def _get_region_idx(self, gene_idx: int) -> int:

return gene_idx // self._length_per_region

def _determine_node_dependencies(self) -> Dict[int, Set[int]]:
""" Determines the set of node indices on which each node depends.
Unused input genes are ignored.
Returns
----
dependencies: Dict[int, Set[int]]
Dictionary containing for every node the set of active input genes
"""
dependencies: Dict[int, Set[int]] = {}
for region_idx, _ in self.iter_hidden_regions():

current_node_dependencies: Set[int] = set()

operator_idx: int = region_idx * self._length_per_region

current_arity: int = self._determine_operator_arity(operator_idx)

for idx_gene in range(
1, current_arity + 1
): # shift by 1 since first gene is the operator gene
input_node_idx = self._dna[operator_idx + idx_gene]
if not self._is_input_region(
input_node_idx
): # not necessary to add input regions, since their positions remain fixed
current_node_dependencies.add(input_node_idx)

dependencies[region_idx] = current_node_dependencies

return dependencies

def _determine_operator_arity(self, gene_idx: int) -> int:

assert self._is_function_gene(gene_idx)

return self._primitives[self._dna[gene_idx]]._arity

def _permissible_inputs(self, region_idx: int) -> List[int]:

assert not self._is_input_region(region_idx)
Expand Down Expand Up @@ -403,6 +544,11 @@ def _is_function_gene(self, gene_idx: int) -> bool:
def _is_hidden_input_gene(self, gene_idx: int, region_idx: int) -> bool:
return self._is_hidden_region(region_idx) and (not self._is_function_gene(gene_idx))

def _is_output_input_gene(self, gene_idx: int) -> bool:
return (
self._is_gene_in_output_region(gene_idx) and gene_idx % self._length_per_region == 1
) # assumes 2nd gene in output is coding for input

def _select_gene_indices_for_mutation(
self, mutation_rate: float, rng: np.random.RandomState
) -> List[int]:
Expand Down Expand Up @@ -450,7 +596,7 @@ def mutate(self, mutation_rate: float, rng: np.random.RandomState):

for (gene_idx, allele) in zip(selected_gene_indices, np.array(dna)[selected_gene_indices]):

region_idx = gene_idx // self._length_per_region
region_idx = self._get_region_idx(gene_idx)

permissible_values = self._permissible_values[gene_idx]
permissible_alternative_values = permissible_values[permissible_values != allele]
Expand Down
1 change: 1 addition & 0 deletions cgp/hl_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def evolve(
parallel evaluation of the objective is supported. Currently
not implemented. Defaults to 1.
Returns
-------
None
Expand Down
14 changes: 14 additions & 0 deletions cgp/individual.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ def mutate(self, mutation_rate, rng):
def randomize_genome(self, rng):
raise NotImplementedError()

def reorder_genome(self, rng):
raise NotImplementedError()

def to_func(self):
raise NotImplementedError()

Expand All @@ -84,6 +87,10 @@ def _mutate_genome(genome: Genome, mutation_rate: float, rng: np.random.RandomSt
def _randomize_genome(genome: Genome, rng: np.random.RandomState) -> None:
genome.randomize(rng)

@staticmethod
def _reorder_genome(genome: Genome, rng: np.random.RandomState) -> None:
genome.reorder(rng)

@staticmethod
def _to_func(genome: Genome) -> Callable[[List[float]], List[float]]:
return CartesianGraph(genome).to_func()
Expand Down Expand Up @@ -137,6 +144,9 @@ def mutate(self, mutation_rate: float, rng: np.random.RandomState) -> None:
def randomize_genome(self, rng: np.random.RandomState) -> None:
self._randomize_genome(self.genome, rng)

def reorder_genome(self, rng: np.random.RandomState) -> None:
self._reorder_genome(self.genome, rng)

def to_func(self) -> Callable[[List[float]], List[float]]:
return self._to_func(self.genome)

Expand Down Expand Up @@ -186,6 +196,10 @@ def randomize_genome(self, rng: np.random.RandomState) -> None:
for g in self.genome:
self._randomize_genome(g, rng)

def reorder_genome(self, rng: np.random.RandomState) -> None:
for g in self.genome:
self._reorder_genome(g, rng)

def to_func(self) -> List[Callable[[List[float]], List[float]]]:
return [self._to_func(g) for g in self.genome]

Expand Down
10 changes: 10 additions & 0 deletions cgp/population.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,13 @@ def fitness_parents(self) -> List[Union[None, float]]:
List of fitness values for all parents.
"""
return [ind.fitness for ind in self._parents]

def reorder_genome(self) -> None:
""" Reorders the genome for all parents in the population
Returns
---------
None
"""
for parent in self.parents:
parent.reorder_genome(self.rng)
Loading

0 comments on commit 0f44a01

Please sign in to comment.