aegis_sim
1import logging 2import pathlib 3 4from aegis_sim.dataclasses.population import Population 5from aegis_sim.bioreactor import Bioreactor 6from aegis_sim import variables, submodels, parameterization 7from aegis_sim.parameterization import parametermanager 8from aegis_sim.recording import recordingmanager 9 10 11def run(custom_config_path, pickle_path, overwrite, custom_input_params, resume_path=None, extend_steps=None): 12 if resume_path is not None: 13 odir = pathlib.Path(resume_path) 14 checkpoint_file = odir / "checkpoint" 15 16 if not odir.exists(): 17 # No output dir yet → fresh run 18 logging.info(f"No output directory at {odir}, starting fresh run.") 19 init(custom_config_path, overwrite=False, pickle_path=pickle_path, custom_input_params=custom_input_params) 20 population = ( 21 Population.initialize( 22 n=parametermanager.parameters.INITIAL_POPULATION_SIZE, 23 AGE_LIMIT=parametermanager.parameters.AGE_LIMIT, 24 ) 25 if pickle_path is None 26 else Population.load_pickle_from(pickle_path) 27 ) 28 eggs = None 29 elif not checkpoint_file.exists(): 30 # Output dir exists but no checkpoint → error 31 raise FileNotFoundError( 32 f"Output directory {odir} exists but contains no checkpoint file. " 33 f"Cannot resume. Use -o to overwrite, or delete the directory." 34 ) 35 else: 36 # Output dir + checkpoint → resume 37 population, eggs = init_resume(resume_path, extend_steps=extend_steps) 38 else: 39 init(custom_config_path, overwrite, pickle_path, custom_input_params) 40 population = ( 41 Population.initialize( 42 n=parametermanager.parameters.INITIAL_POPULATION_SIZE, 43 AGE_LIMIT=parametermanager.parameters.AGE_LIMIT, 44 ) 45 if pickle_path is None 46 else Population.load_pickle_from(pickle_path) 47 ) 48 eggs = None 49 population = _seed_introgression(population) 50 51 bioreactor = Bioreactor(population) 52 bioreactor.eggs = eggs 53 sim(bioreactor=bioreactor) 54 55 56def init(custom_config_path, overwrite=False, pickle_path=None, custom_input_params={}): 57 """ 58 When testing aegis, initialize all modules using this function, e.g. 59 60 import aegis_sim 61 aegis_sim.init("_.yml") 62 63 And then you can safely import any module. 64 """ 65 66 custom_config_path = pathlib.Path(custom_config_path) 67 68 parametermanager.init( 69 custom_config_path=custom_config_path, 70 custom_input_params=custom_input_params, 71 ) 72 variables.init( 73 variables, 74 custom_config_path=custom_config_path, 75 pickle_path=pickle_path, 76 RANDOM_SEED=parametermanager.parameters.RANDOM_SEED, 77 ) 78 parameterization.init_traits(parameterization) 79 submodels.init(submodels, parametermanager=parametermanager) 80 81 recordingmanager.init(custom_config_path, overwrite) 82 recordingmanager.initialize_recorders(TICKER_RATE=parametermanager.parameters.TICKER_RATE) 83 84 85def init_resume(resume_path, extend_steps=None): 86 """Initialize all modules from the latest checkpoint in the given output directory. 87 88 Args: 89 resume_path: Path to the output directory containing the checkpoint. 90 extend_steps: If set, override STEPS_PER_SIMULATION to extend the run. 91 """ 92 from aegis_sim.checkpoint import Checkpoint 93 94 odir = pathlib.Path(resume_path) 95 checkpoint_path = Checkpoint.find_latest(odir) 96 checkpoint = Checkpoint.load(checkpoint_path) 97 98 # Restore parameters from checkpoint config 99 parametermanager.init_from_config(checkpoint.final_config, checkpoint.custom_config_path) 100 101 # Apply --extend override if provided 102 if extend_steps is not None: 103 if extend_steps <= checkpoint.step: 104 raise ValueError( 105 f"--extend {extend_steps} must be greater than checkpoint step {checkpoint.step}" 106 ) 107 parametermanager.parameters.STEPS_PER_SIMULATION = extend_steps 108 parametermanager.final_config["STEPS_PER_SIMULATION"] = extend_steps 109 logging.info(f"Extending simulation to {extend_steps} steps (was {checkpoint.final_config['STEPS_PER_SIMULATION']}).") 110 111 # Restore variables (step, RNG state) 112 variables.restore_from_checkpoint(variables, checkpoint) 113 114 # Re-init traits and submodels 115 parameterization.init_traits(parameterization) 116 submodels.init(submodels, parametermanager=parametermanager) 117 118 # Restore envdrift map if it was active 119 if checkpoint.envdrift_map is not None: 120 submodels.architect.envdrift.map = checkpoint.envdrift_map 121 122 # Restore predator population size 123 submodels.predation.N = checkpoint.predator_population_size 124 125 # Restore resource capacity 126 from aegis_sim.submodels.resources.resources import resources 127 resources.capacity = checkpoint.resource_capacity 128 129 # Init recording in append mode (don't overwrite, don't write headers) 130 recordingmanager.init_for_resume(checkpoint.custom_config_path) 131 recordingmanager.initialize_recorders( 132 TICKER_RATE=parametermanager.parameters.TICKER_RATE, 133 resuming=True, 134 ) 135 136 # Truncate output files to remove data recorded after the checkpoint step 137 recordingmanager.truncate_for_resume(checkpoint.step) 138 139 # Update TE recorder's file counter after truncation may have deleted files 140 te_dir = recordingmanager.odir / "te" 141 if te_dir.exists(): 142 remaining = list(te_dir.glob("*.csv")) 143 recordingmanager.terecorder.TE_number = len(remaining) 144 145 return checkpoint.population, checkpoint.eggs 146 147 148def _seed_introgression(population: Population) -> Population: 149 """If INTROGRESSION_SOURCE and INTROGRESSION_SEEDS are set, load pop A from pickle, 150 sample the requested number of individuals, mark their ancestry as True (introgressed), 151 set pop B ancestry to False (native), and merge into one population. 152 Returns population unchanged if introgression is not configured. 153 """ 154 import numpy as np 155 156 n_seeds = parametermanager.parameters.INTROGRESSION_SEEDS 157 source_path = parametermanager.parameters.INTROGRESSION_SOURCE 158 159 if n_seeds == 0 or source_path is None: 160 return population 161 162 source_path = pathlib.Path(source_path) 163 pop_a = Population.load_pickle_from(source_path) 164 165 if n_seeds > len(pop_a): 166 logging.warning( 167 f"INTROGRESSION_SEEDS={n_seeds} exceeds source population size {len(pop_a)}; " 168 f"clamping to {len(pop_a)}" 169 ) 170 n_seeds = len(pop_a) 171 172 # Sample n_seeds individuals from pop A 173 indices = variables.rng.choice(len(pop_a), size=n_seeds, replace=False) 174 pop_a *= indices 175 176 genome_shape = pop_a.genomes.array.shape # (n_seeds, ploidy, n_loci, bpl) 177 178 # Mark all pop A seeds as introgressed 179 pop_a.ancestry = np.ones(genome_shape, dtype=np.bool_) 180 181 # Mark all pop B individuals as native 182 pop_b_shape = population.genomes.array.shape 183 population.ancestry = np.zeros(pop_b_shape, dtype=np.bool_) 184 185 logging.info( 186 f"Introgression: seeding {n_seeds} individuals from {source_path} into population of {len(population)}" 187 ) 188 189 population += pop_a 190 return population 191 192 193def sim(bioreactor): 194 # presim 195 recordingmanager.configrecorder.write_final_config_file(parametermanager.final_config) 196 recordingmanager.ticker.start_process() 197 ticker_pid = recordingmanager.ticker.pid 198 assert ticker_pid is not None 199 recordingmanager.summaryrecorder.write_input_summary(ticker_pid=recordingmanager.ticker.pid) 200 # TODO hacky solution of decrementing and incrementing steps 201 variables.steps -= 1 202 recordingmanager.featherrecorder.write(bioreactor.population) 203 variables.steps += 1 204 205 # sim 206 recordingmanager.phenomaprecorder.write() 207 208 # Write initial checkpoint before the loop so there's always something to resume from 209 if parametermanager.parameters.CHECKPOINT_RATE > 0: 210 from aegis_sim.checkpoint import Checkpoint 211 initial_cp = Checkpoint.capture(bioreactor.population, bioreactor.eggs, variables, submodels, parametermanager) 212 initial_cp.save(recordingmanager.checkpointrecorder.checkpoint_path) 213 logging.debug("Initial checkpoint saved before sim loop.") 214 215 while (variables.steps <= parametermanager.parameters.STEPS_PER_SIMULATION) and not recordingmanager.is_extinct(): 216 recordingmanager.progressrecorder.write(len(bioreactor.population)) 217 recordingmanager.simpleprogressrecorder.write() 218 bioreactor.run_step() 219 variables.steps += 1 220 221 # postsim 222 recordingmanager.popsizerecorder.flush_all() 223 recordingmanager.resourcerecorder.flush_all() 224 recordingmanager.summaryrecorder.write_output_summary() 225 logging.info("Simulation finished.") 226 recordingmanager.ticker.stop_process()
def
run( custom_config_path, pickle_path, overwrite, custom_input_params, resume_path=None, extend_steps=None):
12def run(custom_config_path, pickle_path, overwrite, custom_input_params, resume_path=None, extend_steps=None): 13 if resume_path is not None: 14 odir = pathlib.Path(resume_path) 15 checkpoint_file = odir / "checkpoint" 16 17 if not odir.exists(): 18 # No output dir yet → fresh run 19 logging.info(f"No output directory at {odir}, starting fresh run.") 20 init(custom_config_path, overwrite=False, pickle_path=pickle_path, custom_input_params=custom_input_params) 21 population = ( 22 Population.initialize( 23 n=parametermanager.parameters.INITIAL_POPULATION_SIZE, 24 AGE_LIMIT=parametermanager.parameters.AGE_LIMIT, 25 ) 26 if pickle_path is None 27 else Population.load_pickle_from(pickle_path) 28 ) 29 eggs = None 30 elif not checkpoint_file.exists(): 31 # Output dir exists but no checkpoint → error 32 raise FileNotFoundError( 33 f"Output directory {odir} exists but contains no checkpoint file. " 34 f"Cannot resume. Use -o to overwrite, or delete the directory." 35 ) 36 else: 37 # Output dir + checkpoint → resume 38 population, eggs = init_resume(resume_path, extend_steps=extend_steps) 39 else: 40 init(custom_config_path, overwrite, pickle_path, custom_input_params) 41 population = ( 42 Population.initialize( 43 n=parametermanager.parameters.INITIAL_POPULATION_SIZE, 44 AGE_LIMIT=parametermanager.parameters.AGE_LIMIT, 45 ) 46 if pickle_path is None 47 else Population.load_pickle_from(pickle_path) 48 ) 49 eggs = None 50 population = _seed_introgression(population) 51 52 bioreactor = Bioreactor(population) 53 bioreactor.eggs = eggs 54 sim(bioreactor=bioreactor)
def
init( custom_config_path, overwrite=False, pickle_path=None, custom_input_params={}):
57def init(custom_config_path, overwrite=False, pickle_path=None, custom_input_params={}): 58 """ 59 When testing aegis, initialize all modules using this function, e.g. 60 61 import aegis_sim 62 aegis_sim.init("_.yml") 63 64 And then you can safely import any module. 65 """ 66 67 custom_config_path = pathlib.Path(custom_config_path) 68 69 parametermanager.init( 70 custom_config_path=custom_config_path, 71 custom_input_params=custom_input_params, 72 ) 73 variables.init( 74 variables, 75 custom_config_path=custom_config_path, 76 pickle_path=pickle_path, 77 RANDOM_SEED=parametermanager.parameters.RANDOM_SEED, 78 ) 79 parameterization.init_traits(parameterization) 80 submodels.init(submodels, parametermanager=parametermanager) 81 82 recordingmanager.init(custom_config_path, overwrite) 83 recordingmanager.initialize_recorders(TICKER_RATE=parametermanager.parameters.TICKER_RATE)
When testing aegis, initialize all modules using this function, e.g.
import aegis_sim aegis_sim.init("_.yml")
And then you can safely import any module.
def
init_resume(resume_path, extend_steps=None):
86def init_resume(resume_path, extend_steps=None): 87 """Initialize all modules from the latest checkpoint in the given output directory. 88 89 Args: 90 resume_path: Path to the output directory containing the checkpoint. 91 extend_steps: If set, override STEPS_PER_SIMULATION to extend the run. 92 """ 93 from aegis_sim.checkpoint import Checkpoint 94 95 odir = pathlib.Path(resume_path) 96 checkpoint_path = Checkpoint.find_latest(odir) 97 checkpoint = Checkpoint.load(checkpoint_path) 98 99 # Restore parameters from checkpoint config 100 parametermanager.init_from_config(checkpoint.final_config, checkpoint.custom_config_path) 101 102 # Apply --extend override if provided 103 if extend_steps is not None: 104 if extend_steps <= checkpoint.step: 105 raise ValueError( 106 f"--extend {extend_steps} must be greater than checkpoint step {checkpoint.step}" 107 ) 108 parametermanager.parameters.STEPS_PER_SIMULATION = extend_steps 109 parametermanager.final_config["STEPS_PER_SIMULATION"] = extend_steps 110 logging.info(f"Extending simulation to {extend_steps} steps (was {checkpoint.final_config['STEPS_PER_SIMULATION']}).") 111 112 # Restore variables (step, RNG state) 113 variables.restore_from_checkpoint(variables, checkpoint) 114 115 # Re-init traits and submodels 116 parameterization.init_traits(parameterization) 117 submodels.init(submodels, parametermanager=parametermanager) 118 119 # Restore envdrift map if it was active 120 if checkpoint.envdrift_map is not None: 121 submodels.architect.envdrift.map = checkpoint.envdrift_map 122 123 # Restore predator population size 124 submodels.predation.N = checkpoint.predator_population_size 125 126 # Restore resource capacity 127 from aegis_sim.submodels.resources.resources import resources 128 resources.capacity = checkpoint.resource_capacity 129 130 # Init recording in append mode (don't overwrite, don't write headers) 131 recordingmanager.init_for_resume(checkpoint.custom_config_path) 132 recordingmanager.initialize_recorders( 133 TICKER_RATE=parametermanager.parameters.TICKER_RATE, 134 resuming=True, 135 ) 136 137 # Truncate output files to remove data recorded after the checkpoint step 138 recordingmanager.truncate_for_resume(checkpoint.step) 139 140 # Update TE recorder's file counter after truncation may have deleted files 141 te_dir = recordingmanager.odir / "te" 142 if te_dir.exists(): 143 remaining = list(te_dir.glob("*.csv")) 144 recordingmanager.terecorder.TE_number = len(remaining) 145 146 return checkpoint.population, checkpoint.eggs
Initialize all modules from the latest checkpoint in the given output directory.
Args: resume_path: Path to the output directory containing the checkpoint. extend_steps: If set, override STEPS_PER_SIMULATION to extend the run.
def
sim(bioreactor):
194def sim(bioreactor): 195 # presim 196 recordingmanager.configrecorder.write_final_config_file(parametermanager.final_config) 197 recordingmanager.ticker.start_process() 198 ticker_pid = recordingmanager.ticker.pid 199 assert ticker_pid is not None 200 recordingmanager.summaryrecorder.write_input_summary(ticker_pid=recordingmanager.ticker.pid) 201 # TODO hacky solution of decrementing and incrementing steps 202 variables.steps -= 1 203 recordingmanager.featherrecorder.write(bioreactor.population) 204 variables.steps += 1 205 206 # sim 207 recordingmanager.phenomaprecorder.write() 208 209 # Write initial checkpoint before the loop so there's always something to resume from 210 if parametermanager.parameters.CHECKPOINT_RATE > 0: 211 from aegis_sim.checkpoint import Checkpoint 212 initial_cp = Checkpoint.capture(bioreactor.population, bioreactor.eggs, variables, submodels, parametermanager) 213 initial_cp.save(recordingmanager.checkpointrecorder.checkpoint_path) 214 logging.debug("Initial checkpoint saved before sim loop.") 215 216 while (variables.steps <= parametermanager.parameters.STEPS_PER_SIMULATION) and not recordingmanager.is_extinct(): 217 recordingmanager.progressrecorder.write(len(bioreactor.population)) 218 recordingmanager.simpleprogressrecorder.write() 219 bioreactor.run_step() 220 variables.steps += 1 221 222 # postsim 223 recordingmanager.popsizerecorder.flush_all() 224 recordingmanager.resourcerecorder.flush_all() 225 recordingmanager.summaryrecorder.write_output_summary() 226 logging.info("Simulation finished.") 227 recordingmanager.ticker.stop_process()