aegis_sim.dataclasses.phenotypes
Wrapper for phenotype vectors.
1"""Wrapper for phenotype vectors.""" 2 3import numba 4import numpy as np 5from aegis_sim import parameterization 6from aegis_sim.constants import GENETIC_TRAITS 7 8 9class Phenotypes: 10 """ 11 Wrapper for phenotype vectors. 12 Shape is [individuals, phenotypic values]. 13 Traits are ordered as in GENETIC_TRAITS. 14 """ 15 16 def __init__(self, array): 17 18 if len(array.shape) > 1: # TODO remove this condition once the 'hacky' solution is gone 19 assert ( 20 array.shape[1] == parameterization.expected_phenotype_length 21 ), f"Array shape ({array.shape[1]}) should be equal to ({parameterization.expected_phenotype_length})" 22 23 # clip phenotype to [0,1] / Apply lo and hi bound 24 array = self.clip_array_to_01(array) 25 self.array = array 26 27 @staticmethod 28 def init_phenotype_array(popsize): 29 array = np.zeros(shape=(popsize, parameterization.expected_phenotype_length)) 30 return Phenotypes(array) 31 32 def __len__(self): 33 return len(self.array) 34 35 # def to_frame(self, AGE_LIMIT): 36 37 # df = pd.DataFrame(self.array) 38 39 # # Edit index 40 # # df.reset_index(drop=True, inplace=True) 41 # # df.index.name = "individual" 42 43 # # Edit columns 44 # traits = constants.GENETIC_TRAITS 45 46 # # AGE_LIMIT = parameterization.parametermanager.parameters.AGE_LIMIT 47 # ages = [str(a) for a in range(AGE_LIMIT)] 48 # multi_columns = pd.MultiIndex.from_product([traits, ages], names=["trait", "age"]) 49 # df.columns = multi_columns 50 51 # return df 52 53 @staticmethod 54 def get_number_of_evolvable_traits(): 55 return sum(trait.evolvable for trait in parameterization.traits.values()) 56 57 def get(self, individuals=slice(None), loci=slice(None)): 58 return self.array[individuals, loci] 59 60 def add(self, phenotypes): 61 self.array = np.concatenate([self.array, phenotypes.array]) 62 63 def keep(self, individuals): 64 self.array = self.array[individuals] 65 66 def extract(self, trait_name, ages, part=None): 67 68 # TODO Shift some responsibilities to Phenotypes dataclass 69 70 # Generate index of target individuals 71 72 n_individuals = len(self) 73 74 which_individuals = np.arange(n_individuals) 75 76 if part is not None: 77 which_individuals = which_individuals[part] 78 79 # Fetch trait in question 80 trait = parameterization.traits[trait_name] 81 82 # Reminder. 83 # Traits can be evolvable and age-specific (thus different across individuals and ages) 84 # Traits can be evolvable and non age-specific (thus different between individuals but same across ages) 85 # Traits can be non-evolvable (thus same for all individuals at all ages) 86 87 if not trait.evolvable: 88 probs = trait.initpheno 89 else: 90 which_loci = trait.start 91 if trait.agespecific: 92 shift_by_age = ages[which_individuals] 93 which_loci += shift_by_age 94 probs = self.get(which_individuals, which_loci) 95 96 # expand values back into an array with shape of whole population 97 final_probs = np.zeros(n_individuals, dtype=np.float32) 98 final_probs[which_individuals] += probs 99 100 return final_probs 101 102 def clip_array_to_01(self, array): 103 104 is_egg_container = len(array.shape) == 1 105 if is_egg_container: 106 # Eggs do not have computed phenotypes. Only when they hatch, are their phenotypes computed. 107 return array 108 109 for trait_name in GENETIC_TRAITS: 110 lo = parameterization.traits[trait_name].lo 111 hi = parameterization.traits[trait_name].hi 112 113 # get old values 114 _, _, slice_ = Phenotypes.get_trait_position(trait_name=trait_name) 115 values = array[:, slice_] 116 117 # clip 118 new_values = lo + values * (hi - lo) 119 120 # set new values 121 array[:, slice_] = new_values 122 123 return array 124 125 @staticmethod 126 def gaussian_smoothing(array): 127 128 SMOOTHING_FACTOR = parameterization.parametermanager.parameters.SMOOTHING_FACTOR 129 if SMOOTHING_FACTOR == 0: 130 return array 131 132 for trait_name in GENETIC_TRAITS: 133 # get old values 134 _, _, slice_ = Phenotypes.get_trait_position(trait_name=trait_name) 135 values = array[:, slice_] 136 137 if values.size == 0: 138 continue 139 140 # clip 141 142 new_values = gaussian_smooth_rows_with_padding_numba(values, sigma=SMOOTHING_FACTOR) 143 144 # set new values 145 array[:, slice_] = new_values 146 147 return array 148 149 @staticmethod 150 def get_trait_position(trait_name): 151 index = GENETIC_TRAITS.index(trait_name) 152 AGE_LIMIT = parameterization.parametermanager.parameters.AGE_LIMIT 153 start = index * AGE_LIMIT 154 end = start + AGE_LIMIT 155 slice_ = slice(start, end) 156 return start, end, slice_ 157 158 # @staticmethod 159 # def clip_array_to_01(array): 160 # for trait in parameterization.traits.values(): 161 # array[:, trait.slice] = Phenotypes.clip_trait_to_01(array[:, trait.slice], trait.name) 162 # return array 163 164 # @staticmethod 165 # def clip_trait_to_01(array, traitname): 166 # lo = parameterization.traits[traitname].lo 167 # hi = parameterization.traits[traitname].hi 168 # return lo + array * (hi - lo) 169 170 # # Recording functions 171 # def to_feather(self): 172 # for_feather = pd.DataFrame(self.array) 173 # for_feather.columns = for_feather.columns.astype(str) 174 # return for_feather 175 176 # @staticmethod 177 # def from_feather(path: pathlib.Path): 178 # from_feather = pd.read_feather(path) 179 # return Phenotypes(array=from_feather) 180 181 182# Smoothing functions 183def gaussian_kernel1d(sigma, radius=None): 184 if radius is None: 185 radius = int(3 * sigma) 186 x = np.arange(-radius, radius + 1) 187 kernel = np.exp(-0.5 * (x / sigma) ** 2) 188 kernel /= kernel.sum() 189 return kernel 190 191 192def gaussian_smooth_rows_with_padding(array_2d, sigma): 193 kernel = gaussian_kernel1d(sigma) 194 pad = len(kernel) // 2 195 smoothed = np.array([np.convolve(np.pad(row, pad, mode="reflect"), kernel, mode="valid") for row in array_2d]) 196 return smoothed 197 198 199@numba.njit 200def create_gaussian_kernel1d(sigma): 201 radius = int(3 * sigma) 202 size = 2 * radius + 1 203 kernel = np.empty(size, dtype=np.float64) 204 for i in range(size): 205 x = i - radius 206 kernel[i] = np.exp(-(x * x) / (2 * sigma * sigma)) 207 kernel /= kernel.sum() 208 return kernel 209 210 211@numba.njit 212def reflect_pad_1d(row, pad): 213 padded = np.empty(row.size + 2 * pad, dtype=np.float64) 214 for i in range(pad): 215 padded[i] = row[pad - i] 216 padded[pad : pad + row.size] = row 217 for i in range(pad): 218 padded[pad + row.size + i] = row[row.size - 2 - i] 219 return padded 220 221 222@numba.njit 223def convolve_valid(padded_row, kernel): 224 output = np.empty(padded_row.size - kernel.size + 1, dtype=np.float64) 225 for i in range(output.size): 226 val = 0.0 227 for j in range(kernel.size): 228 val += padded_row[i + j] * kernel[j] 229 output[i] = val 230 return output 231 232 233@numba.njit 234def gaussian_smooth_rows_with_padding_numba(array_2d, sigma): 235 kernel = create_gaussian_kernel1d(sigma) 236 pad = kernel.size // 2 237 n_rows, n_cols = array_2d.shape 238 output = np.empty_like(array_2d) 239 for i in range(n_rows): 240 padded_row = reflect_pad_1d(array_2d[i], pad) 241 smoothed_row = convolve_valid(padded_row, kernel) 242 output[i] = smoothed_row 243 return output
class
Phenotypes:
10class Phenotypes: 11 """ 12 Wrapper for phenotype vectors. 13 Shape is [individuals, phenotypic values]. 14 Traits are ordered as in GENETIC_TRAITS. 15 """ 16 17 def __init__(self, array): 18 19 if len(array.shape) > 1: # TODO remove this condition once the 'hacky' solution is gone 20 assert ( 21 array.shape[1] == parameterization.expected_phenotype_length 22 ), f"Array shape ({array.shape[1]}) should be equal to ({parameterization.expected_phenotype_length})" 23 24 # clip phenotype to [0,1] / Apply lo and hi bound 25 array = self.clip_array_to_01(array) 26 self.array = array 27 28 @staticmethod 29 def init_phenotype_array(popsize): 30 array = np.zeros(shape=(popsize, parameterization.expected_phenotype_length)) 31 return Phenotypes(array) 32 33 def __len__(self): 34 return len(self.array) 35 36 # def to_frame(self, AGE_LIMIT): 37 38 # df = pd.DataFrame(self.array) 39 40 # # Edit index 41 # # df.reset_index(drop=True, inplace=True) 42 # # df.index.name = "individual" 43 44 # # Edit columns 45 # traits = constants.GENETIC_TRAITS 46 47 # # AGE_LIMIT = parameterization.parametermanager.parameters.AGE_LIMIT 48 # ages = [str(a) for a in range(AGE_LIMIT)] 49 # multi_columns = pd.MultiIndex.from_product([traits, ages], names=["trait", "age"]) 50 # df.columns = multi_columns 51 52 # return df 53 54 @staticmethod 55 def get_number_of_evolvable_traits(): 56 return sum(trait.evolvable for trait in parameterization.traits.values()) 57 58 def get(self, individuals=slice(None), loci=slice(None)): 59 return self.array[individuals, loci] 60 61 def add(self, phenotypes): 62 self.array = np.concatenate([self.array, phenotypes.array]) 63 64 def keep(self, individuals): 65 self.array = self.array[individuals] 66 67 def extract(self, trait_name, ages, part=None): 68 69 # TODO Shift some responsibilities to Phenotypes dataclass 70 71 # Generate index of target individuals 72 73 n_individuals = len(self) 74 75 which_individuals = np.arange(n_individuals) 76 77 if part is not None: 78 which_individuals = which_individuals[part] 79 80 # Fetch trait in question 81 trait = parameterization.traits[trait_name] 82 83 # Reminder. 84 # Traits can be evolvable and age-specific (thus different across individuals and ages) 85 # Traits can be evolvable and non age-specific (thus different between individuals but same across ages) 86 # Traits can be non-evolvable (thus same for all individuals at all ages) 87 88 if not trait.evolvable: 89 probs = trait.initpheno 90 else: 91 which_loci = trait.start 92 if trait.agespecific: 93 shift_by_age = ages[which_individuals] 94 which_loci += shift_by_age 95 probs = self.get(which_individuals, which_loci) 96 97 # expand values back into an array with shape of whole population 98 final_probs = np.zeros(n_individuals, dtype=np.float32) 99 final_probs[which_individuals] += probs 100 101 return final_probs 102 103 def clip_array_to_01(self, array): 104 105 is_egg_container = len(array.shape) == 1 106 if is_egg_container: 107 # Eggs do not have computed phenotypes. Only when they hatch, are their phenotypes computed. 108 return array 109 110 for trait_name in GENETIC_TRAITS: 111 lo = parameterization.traits[trait_name].lo 112 hi = parameterization.traits[trait_name].hi 113 114 # get old values 115 _, _, slice_ = Phenotypes.get_trait_position(trait_name=trait_name) 116 values = array[:, slice_] 117 118 # clip 119 new_values = lo + values * (hi - lo) 120 121 # set new values 122 array[:, slice_] = new_values 123 124 return array 125 126 @staticmethod 127 def gaussian_smoothing(array): 128 129 SMOOTHING_FACTOR = parameterization.parametermanager.parameters.SMOOTHING_FACTOR 130 if SMOOTHING_FACTOR == 0: 131 return array 132 133 for trait_name in GENETIC_TRAITS: 134 # get old values 135 _, _, slice_ = Phenotypes.get_trait_position(trait_name=trait_name) 136 values = array[:, slice_] 137 138 if values.size == 0: 139 continue 140 141 # clip 142 143 new_values = gaussian_smooth_rows_with_padding_numba(values, sigma=SMOOTHING_FACTOR) 144 145 # set new values 146 array[:, slice_] = new_values 147 148 return array 149 150 @staticmethod 151 def get_trait_position(trait_name): 152 index = GENETIC_TRAITS.index(trait_name) 153 AGE_LIMIT = parameterization.parametermanager.parameters.AGE_LIMIT 154 start = index * AGE_LIMIT 155 end = start + AGE_LIMIT 156 slice_ = slice(start, end) 157 return start, end, slice_ 158 159 # @staticmethod 160 # def clip_array_to_01(array): 161 # for trait in parameterization.traits.values(): 162 # array[:, trait.slice] = Phenotypes.clip_trait_to_01(array[:, trait.slice], trait.name) 163 # return array 164 165 # @staticmethod 166 # def clip_trait_to_01(array, traitname): 167 # lo = parameterization.traits[traitname].lo 168 # hi = parameterization.traits[traitname].hi 169 # return lo + array * (hi - lo) 170 171 # # Recording functions 172 # def to_feather(self): 173 # for_feather = pd.DataFrame(self.array) 174 # for_feather.columns = for_feather.columns.astype(str) 175 # return for_feather 176 177 # @staticmethod 178 # def from_feather(path: pathlib.Path): 179 # from_feather = pd.read_feather(path) 180 # return Phenotypes(array=from_feather)
Wrapper for phenotype vectors. Shape is [individuals, phenotypic values]. Traits are ordered as in GENETIC_TRAITS.
Phenotypes(array)
17 def __init__(self, array): 18 19 if len(array.shape) > 1: # TODO remove this condition once the 'hacky' solution is gone 20 assert ( 21 array.shape[1] == parameterization.expected_phenotype_length 22 ), f"Array shape ({array.shape[1]}) should be equal to ({parameterization.expected_phenotype_length})" 23 24 # clip phenotype to [0,1] / Apply lo and hi bound 25 array = self.clip_array_to_01(array) 26 self.array = array
def
extract(self, trait_name, ages, part=None):
67 def extract(self, trait_name, ages, part=None): 68 69 # TODO Shift some responsibilities to Phenotypes dataclass 70 71 # Generate index of target individuals 72 73 n_individuals = len(self) 74 75 which_individuals = np.arange(n_individuals) 76 77 if part is not None: 78 which_individuals = which_individuals[part] 79 80 # Fetch trait in question 81 trait = parameterization.traits[trait_name] 82 83 # Reminder. 84 # Traits can be evolvable and age-specific (thus different across individuals and ages) 85 # Traits can be evolvable and non age-specific (thus different between individuals but same across ages) 86 # Traits can be non-evolvable (thus same for all individuals at all ages) 87 88 if not trait.evolvable: 89 probs = trait.initpheno 90 else: 91 which_loci = trait.start 92 if trait.agespecific: 93 shift_by_age = ages[which_individuals] 94 which_loci += shift_by_age 95 probs = self.get(which_individuals, which_loci) 96 97 # expand values back into an array with shape of whole population 98 final_probs = np.zeros(n_individuals, dtype=np.float32) 99 final_probs[which_individuals] += probs 100 101 return final_probs
def
clip_array_to_01(self, array):
103 def clip_array_to_01(self, array): 104 105 is_egg_container = len(array.shape) == 1 106 if is_egg_container: 107 # Eggs do not have computed phenotypes. Only when they hatch, are their phenotypes computed. 108 return array 109 110 for trait_name in GENETIC_TRAITS: 111 lo = parameterization.traits[trait_name].lo 112 hi = parameterization.traits[trait_name].hi 113 114 # get old values 115 _, _, slice_ = Phenotypes.get_trait_position(trait_name=trait_name) 116 values = array[:, slice_] 117 118 # clip 119 new_values = lo + values * (hi - lo) 120 121 # set new values 122 array[:, slice_] = new_values 123 124 return array
@staticmethod
def
gaussian_smoothing(array):
126 @staticmethod 127 def gaussian_smoothing(array): 128 129 SMOOTHING_FACTOR = parameterization.parametermanager.parameters.SMOOTHING_FACTOR 130 if SMOOTHING_FACTOR == 0: 131 return array 132 133 for trait_name in GENETIC_TRAITS: 134 # get old values 135 _, _, slice_ = Phenotypes.get_trait_position(trait_name=trait_name) 136 values = array[:, slice_] 137 138 if values.size == 0: 139 continue 140 141 # clip 142 143 new_values = gaussian_smooth_rows_with_padding_numba(values, sigma=SMOOTHING_FACTOR) 144 145 # set new values 146 array[:, slice_] = new_values 147 148 return array
def
gaussian_kernel1d(sigma, radius=None):
def
gaussian_smooth_rows_with_padding(array_2d, sigma):
@numba.njit
def
create_gaussian_kernel1d(sigma):
@numba.njit
def
reflect_pad_1d(row, pad):
@numba.njit
def
convolve_valid(padded_row, kernel):
@numba.njit
def
gaussian_smooth_rows_with_padding_numba(array_2d, sigma):
234@numba.njit 235def gaussian_smooth_rows_with_padding_numba(array_2d, sigma): 236 kernel = create_gaussian_kernel1d(sigma) 237 pad = kernel.size // 2 238 n_rows, n_cols = array_2d.shape 239 output = np.empty_like(array_2d) 240 for i in range(n_rows): 241 padded_row = reflect_pad_1d(array_2d[i], pad) 242 smoothed_row = convolve_valid(padded_row, kernel) 243 output[i] = smoothed_row 244 return output