aegis_sim.dataclasses.phenotypes

Wrapper for phenotype vectors.

  1"""Wrapper for phenotype vectors."""
  2
  3import numba
  4import numpy as np
  5from aegis_sim import parameterization
  6from aegis_sim.constants import GENETIC_TRAITS
  7
  8
  9class Phenotypes:
 10    """
 11    Wrapper for phenotype vectors.
 12    Shape is [individuals, phenotypic values].
 13    Traits are ordered as in GENETIC_TRAITS.
 14    """
 15
 16    def __init__(self, array):
 17
 18        if len(array.shape) > 1:  # TODO remove this condition once the 'hacky' solution is gone
 19            assert (
 20                array.shape[1] == parameterization.expected_phenotype_length
 21            ), f"Array shape ({array.shape[1]}) should be equal to ({parameterization.expected_phenotype_length})"
 22
 23        # clip phenotype to [0,1] / Apply lo and hi bound
 24        array = self.clip_array_to_01(array)
 25        self.array = array
 26
 27    @staticmethod
 28    def init_phenotype_array(popsize):
 29        array = np.zeros(shape=(popsize, parameterization.expected_phenotype_length))
 30        return Phenotypes(array)
 31
 32    def __len__(self):
 33        return len(self.array)
 34
 35    # def to_frame(self, AGE_LIMIT):
 36
 37    #     df = pd.DataFrame(self.array)
 38
 39    #     # Edit index
 40    #     # df.reset_index(drop=True, inplace=True)
 41    #     # df.index.name = "individual"
 42
 43    #     # Edit columns
 44    #     traits = constants.GENETIC_TRAITS
 45
 46    #     # AGE_LIMIT = parameterization.parametermanager.parameters.AGE_LIMIT
 47    #     ages = [str(a) for a in range(AGE_LIMIT)]
 48    #     multi_columns = pd.MultiIndex.from_product([traits, ages], names=["trait", "age"])
 49    #     df.columns = multi_columns
 50
 51    #     return df
 52
 53    @staticmethod
 54    def get_number_of_evolvable_traits():
 55        return sum(trait.evolvable for trait in parameterization.traits.values())
 56
 57    def get(self, individuals=slice(None), loci=slice(None)):
 58        return self.array[individuals, loci]
 59
 60    def add(self, phenotypes):
 61        self.array = np.concatenate([self.array, phenotypes.array])
 62
 63    def keep(self, individuals):
 64        self.array = self.array[individuals]
 65
 66    def extract(self, trait_name, ages, part=None):
 67
 68        # TODO Shift some responsibilities to Phenotypes dataclass
 69
 70        # Generate index of target individuals
 71
 72        n_individuals = len(self)
 73
 74        which_individuals = np.arange(n_individuals)
 75
 76        if part is not None:
 77            which_individuals = which_individuals[part]
 78
 79        # Fetch trait in question
 80        trait = parameterization.traits[trait_name]
 81
 82        # Reminder.
 83        # Traits can be evolvable and age-specific (thus different across individuals and ages)
 84        # Traits can be evolvable and non age-specific (thus different between individuals but same across ages)
 85        # Traits can be non-evolvable (thus same for all individuals at all ages)
 86
 87        if not trait.evolvable:
 88            probs = trait.initpheno
 89        else:
 90            which_loci = trait.start
 91            if trait.agespecific:
 92                shift_by_age = ages[which_individuals]
 93                which_loci += shift_by_age
 94            probs = self.get(which_individuals, which_loci)
 95
 96        # expand values back into an array with shape of whole population
 97        final_probs = np.zeros(n_individuals, dtype=np.float32)
 98        final_probs[which_individuals] += probs
 99
100        return final_probs
101
102    def clip_array_to_01(self, array):
103
104        is_egg_container = len(array.shape) == 1
105        if is_egg_container:
106            # Eggs do not have computed phenotypes. Only when they hatch, are their phenotypes computed.
107            return array
108
109        for trait_name in GENETIC_TRAITS:
110            lo = parameterization.traits[trait_name].lo
111            hi = parameterization.traits[trait_name].hi
112
113            # get old values
114            _, _, slice_ = Phenotypes.get_trait_position(trait_name=trait_name)
115            values = array[:, slice_]
116
117            # clip
118            new_values = lo + values * (hi - lo)
119
120            # set new values
121            array[:, slice_] = new_values
122
123        return array
124
125    @staticmethod
126    def gaussian_smoothing(array):
127
128        SMOOTHING_FACTOR = parameterization.parametermanager.parameters.SMOOTHING_FACTOR
129        if SMOOTHING_FACTOR == 0:
130            return array
131
132        for trait_name in GENETIC_TRAITS:
133            # get old values
134            _, _, slice_ = Phenotypes.get_trait_position(trait_name=trait_name)
135            values = array[:, slice_]
136
137            if values.size == 0:
138                continue
139
140            # clip
141
142            new_values = gaussian_smooth_rows_with_padding_numba(values, sigma=SMOOTHING_FACTOR)
143
144            # set new values
145            array[:, slice_] = new_values
146
147        return array
148
149    @staticmethod
150    def get_trait_position(trait_name):
151        index = GENETIC_TRAITS.index(trait_name)
152        AGE_LIMIT = parameterization.parametermanager.parameters.AGE_LIMIT
153        start = index * AGE_LIMIT
154        end = start + AGE_LIMIT
155        slice_ = slice(start, end)
156        return start, end, slice_
157
158    # @staticmethod
159    # def clip_array_to_01(array):
160    #     for trait in parameterization.traits.values():
161    #         array[:, trait.slice] = Phenotypes.clip_trait_to_01(array[:, trait.slice], trait.name)
162    #     return array
163
164    # @staticmethod
165    # def clip_trait_to_01(array, traitname):
166    #     lo = parameterization.traits[traitname].lo
167    #     hi = parameterization.traits[traitname].hi
168    #     return lo + array * (hi - lo)
169
170    # # Recording functions
171    # def to_feather(self):
172    #     for_feather = pd.DataFrame(self.array)
173    #     for_feather.columns = for_feather.columns.astype(str)
174    #     return for_feather
175
176    # @staticmethod
177    # def from_feather(path: pathlib.Path):
178    #     from_feather = pd.read_feather(path)
179    #     return Phenotypes(array=from_feather)
180
181
182# Smoothing functions
183def gaussian_kernel1d(sigma, radius=None):
184    if radius is None:
185        radius = int(3 * sigma)
186    x = np.arange(-radius, radius + 1)
187    kernel = np.exp(-0.5 * (x / sigma) ** 2)
188    kernel /= kernel.sum()
189    return kernel
190
191
192def gaussian_smooth_rows_with_padding(array_2d, sigma):
193    kernel = gaussian_kernel1d(sigma)
194    pad = len(kernel) // 2
195    smoothed = np.array([np.convolve(np.pad(row, pad, mode="reflect"), kernel, mode="valid") for row in array_2d])
196    return smoothed
197
198
199@numba.njit
200def create_gaussian_kernel1d(sigma):
201    radius = int(3 * sigma)
202    size = 2 * radius + 1
203    kernel = np.empty(size, dtype=np.float64)
204    for i in range(size):
205        x = i - radius
206        kernel[i] = np.exp(-(x * x) / (2 * sigma * sigma))
207    kernel /= kernel.sum()
208    return kernel
209
210
211@numba.njit
212def reflect_pad_1d(row, pad):
213    padded = np.empty(row.size + 2 * pad, dtype=np.float64)
214    for i in range(pad):
215        padded[i] = row[pad - i]
216    padded[pad : pad + row.size] = row
217    for i in range(pad):
218        padded[pad + row.size + i] = row[row.size - 2 - i]
219    return padded
220
221
222@numba.njit
223def convolve_valid(padded_row, kernel):
224    output = np.empty(padded_row.size - kernel.size + 1, dtype=np.float64)
225    for i in range(output.size):
226        val = 0.0
227        for j in range(kernel.size):
228            val += padded_row[i + j] * kernel[j]
229        output[i] = val
230    return output
231
232
233@numba.njit
234def gaussian_smooth_rows_with_padding_numba(array_2d, sigma):
235    kernel = create_gaussian_kernel1d(sigma)
236    pad = kernel.size // 2
237    n_rows, n_cols = array_2d.shape
238    output = np.empty_like(array_2d)
239    for i in range(n_rows):
240        padded_row = reflect_pad_1d(array_2d[i], pad)
241        smoothed_row = convolve_valid(padded_row, kernel)
242        output[i] = smoothed_row
243    return output
class Phenotypes:
 10class Phenotypes:
 11    """
 12    Wrapper for phenotype vectors.
 13    Shape is [individuals, phenotypic values].
 14    Traits are ordered as in GENETIC_TRAITS.
 15    """
 16
 17    def __init__(self, array):
 18
 19        if len(array.shape) > 1:  # TODO remove this condition once the 'hacky' solution is gone
 20            assert (
 21                array.shape[1] == parameterization.expected_phenotype_length
 22            ), f"Array shape ({array.shape[1]}) should be equal to ({parameterization.expected_phenotype_length})"
 23
 24        # clip phenotype to [0,1] / Apply lo and hi bound
 25        array = self.clip_array_to_01(array)
 26        self.array = array
 27
 28    @staticmethod
 29    def init_phenotype_array(popsize):
 30        array = np.zeros(shape=(popsize, parameterization.expected_phenotype_length))
 31        return Phenotypes(array)
 32
 33    def __len__(self):
 34        return len(self.array)
 35
 36    # def to_frame(self, AGE_LIMIT):
 37
 38    #     df = pd.DataFrame(self.array)
 39
 40    #     # Edit index
 41    #     # df.reset_index(drop=True, inplace=True)
 42    #     # df.index.name = "individual"
 43
 44    #     # Edit columns
 45    #     traits = constants.GENETIC_TRAITS
 46
 47    #     # AGE_LIMIT = parameterization.parametermanager.parameters.AGE_LIMIT
 48    #     ages = [str(a) for a in range(AGE_LIMIT)]
 49    #     multi_columns = pd.MultiIndex.from_product([traits, ages], names=["trait", "age"])
 50    #     df.columns = multi_columns
 51
 52    #     return df
 53
 54    @staticmethod
 55    def get_number_of_evolvable_traits():
 56        return sum(trait.evolvable for trait in parameterization.traits.values())
 57
 58    def get(self, individuals=slice(None), loci=slice(None)):
 59        return self.array[individuals, loci]
 60
 61    def add(self, phenotypes):
 62        self.array = np.concatenate([self.array, phenotypes.array])
 63
 64    def keep(self, individuals):
 65        self.array = self.array[individuals]
 66
 67    def extract(self, trait_name, ages, part=None):
 68
 69        # TODO Shift some responsibilities to Phenotypes dataclass
 70
 71        # Generate index of target individuals
 72
 73        n_individuals = len(self)
 74
 75        which_individuals = np.arange(n_individuals)
 76
 77        if part is not None:
 78            which_individuals = which_individuals[part]
 79
 80        # Fetch trait in question
 81        trait = parameterization.traits[trait_name]
 82
 83        # Reminder.
 84        # Traits can be evolvable and age-specific (thus different across individuals and ages)
 85        # Traits can be evolvable and non age-specific (thus different between individuals but same across ages)
 86        # Traits can be non-evolvable (thus same for all individuals at all ages)
 87
 88        if not trait.evolvable:
 89            probs = trait.initpheno
 90        else:
 91            which_loci = trait.start
 92            if trait.agespecific:
 93                shift_by_age = ages[which_individuals]
 94                which_loci += shift_by_age
 95            probs = self.get(which_individuals, which_loci)
 96
 97        # expand values back into an array with shape of whole population
 98        final_probs = np.zeros(n_individuals, dtype=np.float32)
 99        final_probs[which_individuals] += probs
100
101        return final_probs
102
103    def clip_array_to_01(self, array):
104
105        is_egg_container = len(array.shape) == 1
106        if is_egg_container:
107            # Eggs do not have computed phenotypes. Only when they hatch, are their phenotypes computed.
108            return array
109
110        for trait_name in GENETIC_TRAITS:
111            lo = parameterization.traits[trait_name].lo
112            hi = parameterization.traits[trait_name].hi
113
114            # get old values
115            _, _, slice_ = Phenotypes.get_trait_position(trait_name=trait_name)
116            values = array[:, slice_]
117
118            # clip
119            new_values = lo + values * (hi - lo)
120
121            # set new values
122            array[:, slice_] = new_values
123
124        return array
125
126    @staticmethod
127    def gaussian_smoothing(array):
128
129        SMOOTHING_FACTOR = parameterization.parametermanager.parameters.SMOOTHING_FACTOR
130        if SMOOTHING_FACTOR == 0:
131            return array
132
133        for trait_name in GENETIC_TRAITS:
134            # get old values
135            _, _, slice_ = Phenotypes.get_trait_position(trait_name=trait_name)
136            values = array[:, slice_]
137
138            if values.size == 0:
139                continue
140
141            # clip
142
143            new_values = gaussian_smooth_rows_with_padding_numba(values, sigma=SMOOTHING_FACTOR)
144
145            # set new values
146            array[:, slice_] = new_values
147
148        return array
149
150    @staticmethod
151    def get_trait_position(trait_name):
152        index = GENETIC_TRAITS.index(trait_name)
153        AGE_LIMIT = parameterization.parametermanager.parameters.AGE_LIMIT
154        start = index * AGE_LIMIT
155        end = start + AGE_LIMIT
156        slice_ = slice(start, end)
157        return start, end, slice_
158
159    # @staticmethod
160    # def clip_array_to_01(array):
161    #     for trait in parameterization.traits.values():
162    #         array[:, trait.slice] = Phenotypes.clip_trait_to_01(array[:, trait.slice], trait.name)
163    #     return array
164
165    # @staticmethod
166    # def clip_trait_to_01(array, traitname):
167    #     lo = parameterization.traits[traitname].lo
168    #     hi = parameterization.traits[traitname].hi
169    #     return lo + array * (hi - lo)
170
171    # # Recording functions
172    # def to_feather(self):
173    #     for_feather = pd.DataFrame(self.array)
174    #     for_feather.columns = for_feather.columns.astype(str)
175    #     return for_feather
176
177    # @staticmethod
178    # def from_feather(path: pathlib.Path):
179    #     from_feather = pd.read_feather(path)
180    #     return Phenotypes(array=from_feather)

Wrapper for phenotype vectors. Shape is [individuals, phenotypic values]. Traits are ordered as in GENETIC_TRAITS.

Phenotypes(array)
17    def __init__(self, array):
18
19        if len(array.shape) > 1:  # TODO remove this condition once the 'hacky' solution is gone
20            assert (
21                array.shape[1] == parameterization.expected_phenotype_length
22            ), f"Array shape ({array.shape[1]}) should be equal to ({parameterization.expected_phenotype_length})"
23
24        # clip phenotype to [0,1] / Apply lo and hi bound
25        array = self.clip_array_to_01(array)
26        self.array = array
array
@staticmethod
def init_phenotype_array(popsize):
28    @staticmethod
29    def init_phenotype_array(popsize):
30        array = np.zeros(shape=(popsize, parameterization.expected_phenotype_length))
31        return Phenotypes(array)
@staticmethod
def get_number_of_evolvable_traits():
54    @staticmethod
55    def get_number_of_evolvable_traits():
56        return sum(trait.evolvable for trait in parameterization.traits.values())
def get( self, individuals=slice(None, None, None), loci=slice(None, None, None)):
58    def get(self, individuals=slice(None), loci=slice(None)):
59        return self.array[individuals, loci]
def add(self, phenotypes):
61    def add(self, phenotypes):
62        self.array = np.concatenate([self.array, phenotypes.array])
def keep(self, individuals):
64    def keep(self, individuals):
65        self.array = self.array[individuals]
def extract(self, trait_name, ages, part=None):
 67    def extract(self, trait_name, ages, part=None):
 68
 69        # TODO Shift some responsibilities to Phenotypes dataclass
 70
 71        # Generate index of target individuals
 72
 73        n_individuals = len(self)
 74
 75        which_individuals = np.arange(n_individuals)
 76
 77        if part is not None:
 78            which_individuals = which_individuals[part]
 79
 80        # Fetch trait in question
 81        trait = parameterization.traits[trait_name]
 82
 83        # Reminder.
 84        # Traits can be evolvable and age-specific (thus different across individuals and ages)
 85        # Traits can be evolvable and non age-specific (thus different between individuals but same across ages)
 86        # Traits can be non-evolvable (thus same for all individuals at all ages)
 87
 88        if not trait.evolvable:
 89            probs = trait.initpheno
 90        else:
 91            which_loci = trait.start
 92            if trait.agespecific:
 93                shift_by_age = ages[which_individuals]
 94                which_loci += shift_by_age
 95            probs = self.get(which_individuals, which_loci)
 96
 97        # expand values back into an array with shape of whole population
 98        final_probs = np.zeros(n_individuals, dtype=np.float32)
 99        final_probs[which_individuals] += probs
100
101        return final_probs
def clip_array_to_01(self, array):
103    def clip_array_to_01(self, array):
104
105        is_egg_container = len(array.shape) == 1
106        if is_egg_container:
107            # Eggs do not have computed phenotypes. Only when they hatch, are their phenotypes computed.
108            return array
109
110        for trait_name in GENETIC_TRAITS:
111            lo = parameterization.traits[trait_name].lo
112            hi = parameterization.traits[trait_name].hi
113
114            # get old values
115            _, _, slice_ = Phenotypes.get_trait_position(trait_name=trait_name)
116            values = array[:, slice_]
117
118            # clip
119            new_values = lo + values * (hi - lo)
120
121            # set new values
122            array[:, slice_] = new_values
123
124        return array
@staticmethod
def gaussian_smoothing(array):
126    @staticmethod
127    def gaussian_smoothing(array):
128
129        SMOOTHING_FACTOR = parameterization.parametermanager.parameters.SMOOTHING_FACTOR
130        if SMOOTHING_FACTOR == 0:
131            return array
132
133        for trait_name in GENETIC_TRAITS:
134            # get old values
135            _, _, slice_ = Phenotypes.get_trait_position(trait_name=trait_name)
136            values = array[:, slice_]
137
138            if values.size == 0:
139                continue
140
141            # clip
142
143            new_values = gaussian_smooth_rows_with_padding_numba(values, sigma=SMOOTHING_FACTOR)
144
145            # set new values
146            array[:, slice_] = new_values
147
148        return array
@staticmethod
def get_trait_position(trait_name):
150    @staticmethod
151    def get_trait_position(trait_name):
152        index = GENETIC_TRAITS.index(trait_name)
153        AGE_LIMIT = parameterization.parametermanager.parameters.AGE_LIMIT
154        start = index * AGE_LIMIT
155        end = start + AGE_LIMIT
156        slice_ = slice(start, end)
157        return start, end, slice_
def gaussian_kernel1d(sigma, radius=None):
184def gaussian_kernel1d(sigma, radius=None):
185    if radius is None:
186        radius = int(3 * sigma)
187    x = np.arange(-radius, radius + 1)
188    kernel = np.exp(-0.5 * (x / sigma) ** 2)
189    kernel /= kernel.sum()
190    return kernel
def gaussian_smooth_rows_with_padding(array_2d, sigma):
193def gaussian_smooth_rows_with_padding(array_2d, sigma):
194    kernel = gaussian_kernel1d(sigma)
195    pad = len(kernel) // 2
196    smoothed = np.array([np.convolve(np.pad(row, pad, mode="reflect"), kernel, mode="valid") for row in array_2d])
197    return smoothed
@numba.njit
def create_gaussian_kernel1d(sigma):
200@numba.njit
201def create_gaussian_kernel1d(sigma):
202    radius = int(3 * sigma)
203    size = 2 * radius + 1
204    kernel = np.empty(size, dtype=np.float64)
205    for i in range(size):
206        x = i - radius
207        kernel[i] = np.exp(-(x * x) / (2 * sigma * sigma))
208    kernel /= kernel.sum()
209    return kernel
@numba.njit
def reflect_pad_1d(row, pad):
212@numba.njit
213def reflect_pad_1d(row, pad):
214    padded = np.empty(row.size + 2 * pad, dtype=np.float64)
215    for i in range(pad):
216        padded[i] = row[pad - i]
217    padded[pad : pad + row.size] = row
218    for i in range(pad):
219        padded[pad + row.size + i] = row[row.size - 2 - i]
220    return padded
@numba.njit
def convolve_valid(padded_row, kernel):
223@numba.njit
224def convolve_valid(padded_row, kernel):
225    output = np.empty(padded_row.size - kernel.size + 1, dtype=np.float64)
226    for i in range(output.size):
227        val = 0.0
228        for j in range(kernel.size):
229            val += padded_row[i + j] * kernel[j]
230        output[i] = val
231    return output
@numba.njit
def gaussian_smooth_rows_with_padding_numba(array_2d, sigma):
234@numba.njit
235def gaussian_smooth_rows_with_padding_numba(array_2d, sigma):
236    kernel = create_gaussian_kernel1d(sigma)
237    pad = kernel.size // 2
238    n_rows, n_cols = array_2d.shape
239    output = np.empty_like(array_2d)
240    for i in range(n_rows):
241        padded_row = reflect_pad_1d(array_2d[i], pad)
242        smoothed_row = convolve_valid(padded_row, kernel)
243        output[i] = smoothed_row
244    return output