Source code for acat.ga.multitasking

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Implementation for evolutionary multitasking (EM)"""
from math import tanh
from operator import itemgetter
from collections import defaultdict
from ase.ga.population import Population
from ase.ga.convergence import Convergence
from ase.ga import get_raw_score
import numpy as np


[docs]class MultitaskPopulation(Population): """Different tasks are assigned to different niches. The candidates are ranked according to the effective fitness, given by the shortest distance between the raw score of the marked niche and the upper envelope after adding the individual. The raw score is given by the fitness gain in the maximum-gained niche. **The raw scores of each configuration for all tasks must be provided as a Numpy array in atoms.info['data']['raw_scores']**. After providing the raw scores, **the effective score of each configuration is automatically calculated and stored in atoms.info['key_value_pairs']['raw_score']**. The dominating niche of each configuration is stored in atoms.info['key_value_pairs']['dominating_niche'], the best niche (i.e. the niche closest to the upper envelope) is stored in atoms.info['key_value_pairs']['best_niche'], and the niches that are dominated by the dominating niche are stored in atoms.info['data']['niches']. Parameters ---------- num_tasks: int The number of tasks. exp_function: bool, default True If True use an exponential function for ranking the fitness. If False use the same as in Population. exp_prefactor: float, default 0.5 The prefactor used in the exponential fitness scaling function. """ def __init__(self, data_connection, population_size, num_tasks, comparator=None, logfile=None, use_extinct=False, exp_function=True, exp_prefactor=0.5, rng=np.random): self.exp_function = exp_function self.exp_prefactor = exp_prefactor self.vf = lambda x: x.info['key_value_pairs']['dominating_niche'] # The current fitness is set at each update of the population self.current_fitness = None Population.__init__(self, data_connection, population_size, comparator, logfile, use_extinct, rng=rng) self.max_scores = np.full(num_tasks, np.NINF, dtype=float) self.dominating_niches = np.full(num_tasks, -1, dtype=int) self.rep_no_gain = 0
[docs] def get_rank(self, candidates): rank = np.array([-1] * len(candidates)) # Remember the order when decreasing rank later order = dict((candidates[i].info['key_value_pairs']['gaid'], i) for i in range(len(candidates))) # Group candidates in niches according to the variable # function vf and also sort them according to raw score self.set_vf_dict(candidates, key=get_raw_score, reverse=True) # Decrease the rank of the not best candidates in each niche for vf, li in self.vf_dict.items(): for i, c in enumerate(li): rank[order[c.info['key_value_pairs']['gaid']]] -= i return rank
[docs] def set_vf_dict(self, candidates, **sort_arguments): d = defaultdict(list) for c in candidates: d[self.vf(c)].append(c) if sort_arguments: for cl in d.values(): cl.sort(**sort_arguments) self.vf_dict = d
def __get_fitness__(self, candidates): expf = self.exp_function rfit = self.get_rank(candidates) if not expf: rmax = max(rfit) rmin = min(rfit) T = rmin - rmax # If using obj_rank probability, must have non-zero T val. # pop_size must be greater than number of permutations. # We test for this here msg = "Equal fitness for best and worst candidate in the " msg += "population! Fitness scaling is impossible! " msg += "Try with a larger population." assert T != 0., msg return 0.5 * (1. - np.tanh(2. * (rfit - rmax) / T - 1.)) else: return self.exp_prefactor ** (-rfit - 1)
[docs] def update(self, new_cand=None): """The update method in Population will add to the end of the population, that can't be used here since the fitness will potentially change for all candidates when new are added, therefore just recalc the population every time. New candidates are required (must not be added before calling this method). The maximum gain dynamic niching (MGDN) algorithm is executed. """ if new_cand is not None: # Update the upper envelope prev_max_scores = self.max_scores.copy() gained_ids = [] for i, a in enumerate(new_cand): scores = a.info['data']['raw_scores'] gained_niches = np.argwhere(scores > self.max_scores) if gained_niches.size != 0: self.max_scores[gained_niches] = scores[gained_niches] gained_ids.append(i) # Update the array that records the niche dominating other gained niches # with the requirements of: 1. contributes to the updated upper envelope; # 2. maximum in gain compared to the previous upper envelope first_generation = np.any(prev_max_scores == np.NINF) for i in gained_ids: scores = new_cand[i].info['data']['raw_scores'] maxed_niches = np.argwhere(scores == self.max_scores) if maxed_niches.size != 0: if first_generation: dominating_niche = int(max(maxed_niches, key=lambda x: scores[x])) else: dominating_niche = int(max(maxed_niches, key=lambda x: scores[x] - prev_max_scores[x])) self.dominating_niches[maxed_niches] = dominating_niche # Caculate the effective fitness and assign a niche for each new candidate for i in range(len(new_cand)): scores = new_cand[i].info['data']['raw_scores'] min_loss_niche = np.argmax(scores - self.max_scores) dominating_niche = self.dominating_niches[min_loss_niche] f_eff = float(np.around(scores[min_loss_niche] - self.max_scores[min_loss_niche], 8)) new_cand[i].info['key_value_pairs']['raw_score'] = f_eff new_cand[i].info['key_value_pairs']['dominating_niche'] = dominating_niche new_cand[i].info['key_value_pairs']['best_niche'] = min_loss_niche new_cand[i].info['data']['niches'] = np.argwhere( self.dominating_niches==dominating_niche).flatten() # Update the fitness of all previously-relaxed candidates if fitness # is gained at any niche from the new generation (niche migration) updated_cand = [] if gained_ids and (len(self.pop) > 0): # Update the database prev_cand = self.dc.get_all_relaxed_candidates() prev_cand.sort(key=lambda x: x.info['confid']) del_ids = [] for a in prev_cand: scores = a.info['data']['raw_scores'] min_loss_niche = np.argmax(scores - self.max_scores) dominating_niche = self.dominating_niches[min_loss_niche] f_eff = float(np.around(scores[min_loss_niche] - self.max_scores[min_loss_niche], 8)) a.info['key_value_pairs']['raw_score'] = f_eff a.info['key_value_pairs']['dominating_niche'] = dominating_niche a.info['key_value_pairs']['best_niche'] = min_loss_niche a.info['data']['niches'] = np.argwhere( self.dominating_niches==dominating_niche).flatten() updated_cand.append(a) gaid = a.info['confid'] del_ids.append(gaid) self.dc.c.delete(del_ids) self.rep_no_gain = 0 else: self.rep_no_gain += 1 self.dc.add_more_relaxed_candidates(updated_cand + new_cand) self.pop = [] self.__initialize_pop__() self._write_log()
def __initialize_pop__(self): # Get all relaxed candidates from the database ue = self.use_extinct all_cand = self.dc.get_all_relaxed_candidates(use_extinct=ue) all_cand.sort(key=get_raw_score, reverse=True) if len(all_cand) > 0: fitf = self.__get_fitness__(all_cand) all_sorted = list(zip(fitf, all_cand)) all_sorted.sort(key=itemgetter(0), reverse=True) sort_cand = [] for _, t2 in all_sorted: sort_cand.append(t2) all_sorted = sort_cand # Fill up the population with the self.pop_size most stable # unique candidates. i = 0 while i < len(all_sorted) and len(self.pop) < self.pop_size: c = all_sorted[i] c_vf = self.vf(c) i += 1 eq = False for a in self.pop: a_vf = self.vf(a) # Only run comparator if the variable_function (self.vf) # returns the same. If it returns something different the # candidates are inherently different. # This is done to speed up. if a_vf == c_vf: if self.comparator.looks_like(a, c): eq = True break if not eq: self.pop.append(c) self.current_fitness = self.__get_fitness__(self.pop) self.all_cand = all_cand
[docs] def get_two_candidates(self): """Returns two candidates for pairing employing the roulete wheel selection scheme described in R.L. Johnston Dalton Transactions, Vol. 22, No. 22. (2003), pp. 4193-4207 """ if len(self.pop) < 2: self.update() if len(self.pop) < 2: return None # Use saved fitness fit = self.current_fitness fmax = max(fit) c1 = self.pop[0] c2 = self.pop[0] while c1.info['confid'] == c2.info['confid']: nnf = True while nnf: t = self.rng.randint(len(self.pop)) if fit[t] > self.rng.rand() * fmax: c1 = self.pop[t] nnf = False nnf = True while nnf: t = self.rng.randint(len(self.pop)) if fit[t] > self.rng.rand() * fmax: c2 = self.pop[t] nnf = False return (c1.copy(), c2.copy())
[docs]class MultitaskRepetitionConvergence(Convergence): """Returns True if the latest finished population has no fitness gain in any task for number_of_generations. Parameters ---------- number_of_generations: int How many generations need to be equal before convergence. max_generations: int, default indefinte The maximum number of generations the GA is allowed to run. """ def __init__(self, population_instance, number_of_generations, max_generations=100000000): Convergence.__init__(self, population_instance) self.numgens = number_of_generations self.maxgen = max_generations
[docs] def converged(self): size = self.pop.pop_size cur_gen_num = self.pop.dc.get_generation_number(size) if cur_gen_num >= self.maxgen: return True if cur_gen_num <= 1: return False if self.pop.rep_no_gain >= self.numgens: return True return False