aegis_sim

  1import logging
  2import pathlib
  3
  4from aegis_sim.dataclasses.population import Population
  5from aegis_sim.bioreactor import Bioreactor
  6from aegis_sim import variables, submodels, parameterization
  7from aegis_sim.parameterization import parametermanager
  8from aegis_sim.recording import recordingmanager
  9
 10
 11def run(custom_config_path, pickle_path, overwrite, custom_input_params, resume_path=None, extend_steps=None):
 12    if resume_path is not None:
 13        odir = pathlib.Path(resume_path)
 14        checkpoint_file = odir / "checkpoint"
 15
 16        if not odir.exists():
 17            # No output dir yet → fresh run
 18            logging.info(f"No output directory at {odir}, starting fresh run.")
 19            init(custom_config_path, overwrite=False, pickle_path=pickle_path, custom_input_params=custom_input_params)
 20            population = (
 21                Population.initialize(
 22                    n=parametermanager.parameters.INITIAL_POPULATION_SIZE,
 23                    AGE_LIMIT=parametermanager.parameters.AGE_LIMIT,
 24                )
 25                if pickle_path is None
 26                else Population.load_pickle_from(pickle_path)
 27            )
 28            eggs = None
 29        elif not checkpoint_file.exists():
 30            # Output dir exists but no checkpoint → error
 31            raise FileNotFoundError(
 32                f"Output directory {odir} exists but contains no checkpoint file. "
 33                f"Cannot resume. Use -o to overwrite, or delete the directory."
 34            )
 35        else:
 36            # Output dir + checkpoint → resume
 37            population, eggs = init_resume(resume_path, extend_steps=extend_steps)
 38    else:
 39        init(custom_config_path, overwrite, pickle_path, custom_input_params)
 40        population = (
 41            Population.initialize(
 42                n=parametermanager.parameters.INITIAL_POPULATION_SIZE,
 43                AGE_LIMIT=parametermanager.parameters.AGE_LIMIT,
 44            )
 45            if pickle_path is None
 46            else Population.load_pickle_from(pickle_path)
 47        )
 48        eggs = None
 49
 50    bioreactor = Bioreactor(population)
 51    bioreactor.eggs = eggs
 52    sim(bioreactor=bioreactor)
 53
 54
 55def init(custom_config_path, overwrite=False, pickle_path=None, custom_input_params={}):
 56    """
 57    When testing aegis, initialize all modules using this function, e.g.
 58
 59    import aegis_sim
 60    aegis_sim.init("_.yml")
 61
 62    And then you can safely import any module.
 63    """
 64
 65    custom_config_path = pathlib.Path(custom_config_path)
 66
 67    parametermanager.init(
 68        custom_config_path=custom_config_path,
 69        custom_input_params=custom_input_params,
 70    )
 71    variables.init(
 72        variables,
 73        custom_config_path=custom_config_path,
 74        pickle_path=pickle_path,
 75        RANDOM_SEED=parametermanager.parameters.RANDOM_SEED,
 76    )
 77    parameterization.init_traits(parameterization)
 78    submodels.init(submodels, parametermanager=parametermanager)
 79
 80    recordingmanager.init(custom_config_path, overwrite)
 81    recordingmanager.initialize_recorders(TICKER_RATE=parametermanager.parameters.TICKER_RATE)
 82
 83
 84def init_resume(resume_path, extend_steps=None):
 85    """Initialize all modules from the latest checkpoint in the given output directory.
 86
 87    Args:
 88        resume_path: Path to the output directory containing the checkpoint.
 89        extend_steps: If set, override STEPS_PER_SIMULATION to extend the run.
 90    """
 91    from aegis_sim.checkpoint import Checkpoint
 92
 93    odir = pathlib.Path(resume_path)
 94    checkpoint_path = Checkpoint.find_latest(odir)
 95    checkpoint = Checkpoint.load(checkpoint_path)
 96
 97    # Restore parameters from checkpoint config
 98    parametermanager.init_from_config(checkpoint.final_config, checkpoint.custom_config_path)
 99
100    # Apply --extend override if provided
101    if extend_steps is not None:
102        if extend_steps <= checkpoint.step:
103            raise ValueError(
104                f"--extend {extend_steps} must be greater than checkpoint step {checkpoint.step}"
105            )
106        parametermanager.parameters.STEPS_PER_SIMULATION = extend_steps
107        parametermanager.final_config["STEPS_PER_SIMULATION"] = extend_steps
108        logging.info(f"Extending simulation to {extend_steps} steps (was {checkpoint.final_config['STEPS_PER_SIMULATION']}).")
109
110    # Restore variables (step, RNG state)
111    variables.restore_from_checkpoint(variables, checkpoint)
112
113    # Re-init traits and submodels
114    parameterization.init_traits(parameterization)
115    submodels.init(submodels, parametermanager=parametermanager)
116
117    # Restore envdrift map if it was active
118    if checkpoint.envdrift_map is not None:
119        submodels.architect.envdrift.map = checkpoint.envdrift_map
120
121    # Restore predator population size
122    submodels.predation.N = checkpoint.predator_population_size
123
124    # Restore resource capacity
125    from aegis_sim.submodels.resources.resources import resources
126    resources.capacity = checkpoint.resource_capacity
127
128    # Init recording in append mode (don't overwrite, don't write headers)
129    recordingmanager.init_for_resume(checkpoint.custom_config_path)
130    recordingmanager.initialize_recorders(
131        TICKER_RATE=parametermanager.parameters.TICKER_RATE,
132        resuming=True,
133    )
134
135    # Truncate output files to remove data recorded after the checkpoint step
136    recordingmanager.truncate_for_resume(checkpoint.step)
137
138    # Update TE recorder's file counter after truncation may have deleted files
139    te_dir = recordingmanager.odir / "te"
140    if te_dir.exists():
141        remaining = list(te_dir.glob("*.csv"))
142        recordingmanager.terecorder.TE_number = len(remaining)
143
144    return checkpoint.population, checkpoint.eggs
145
146
147
148def sim(bioreactor):
149    # presim
150    recordingmanager.configrecorder.write_final_config_file(parametermanager.final_config)
151    recordingmanager.ticker.start_process()
152    ticker_pid = recordingmanager.ticker.pid
153    assert ticker_pid is not None
154    recordingmanager.summaryrecorder.write_input_summary(ticker_pid=recordingmanager.ticker.pid)
155    # TODO hacky solution of decrementing and incrementing steps
156    variables.steps -= 1
157    recordingmanager.featherrecorder.write(bioreactor.population)
158    variables.steps += 1
159
160    # sim
161    recordingmanager.phenomaprecorder.write()
162
163    # Write initial checkpoint before the loop so there's always something to resume from
164    if parametermanager.parameters.CHECKPOINT_RATE > 0:
165        from aegis_sim.checkpoint import Checkpoint
166        initial_cp = Checkpoint.capture(bioreactor.population, bioreactor.eggs, variables, submodels, parametermanager)
167        initial_cp.save(recordingmanager.checkpointrecorder.checkpoint_path)
168        logging.debug("Initial checkpoint saved before sim loop.")
169
170    while (variables.steps <= parametermanager.parameters.STEPS_PER_SIMULATION) and not recordingmanager.is_extinct():
171        recordingmanager.progressrecorder.write(len(bioreactor.population))
172        recordingmanager.simpleprogressrecorder.write()
173        bioreactor.run_step()
174        variables.steps += 1
175
176    # postsim
177    recordingmanager.popsizerecorder.flush_all()
178    recordingmanager.resourcerecorder.flush_all()
179    recordingmanager.summaryrecorder.write_output_summary()
180    logging.info("Simulation finished.")
181    recordingmanager.ticker.stop_process()
def run( custom_config_path, pickle_path, overwrite, custom_input_params, resume_path=None, extend_steps=None):
12def run(custom_config_path, pickle_path, overwrite, custom_input_params, resume_path=None, extend_steps=None):
13    if resume_path is not None:
14        odir = pathlib.Path(resume_path)
15        checkpoint_file = odir / "checkpoint"
16
17        if not odir.exists():
18            # No output dir yet → fresh run
19            logging.info(f"No output directory at {odir}, starting fresh run.")
20            init(custom_config_path, overwrite=False, pickle_path=pickle_path, custom_input_params=custom_input_params)
21            population = (
22                Population.initialize(
23                    n=parametermanager.parameters.INITIAL_POPULATION_SIZE,
24                    AGE_LIMIT=parametermanager.parameters.AGE_LIMIT,
25                )
26                if pickle_path is None
27                else Population.load_pickle_from(pickle_path)
28            )
29            eggs = None
30        elif not checkpoint_file.exists():
31            # Output dir exists but no checkpoint → error
32            raise FileNotFoundError(
33                f"Output directory {odir} exists but contains no checkpoint file. "
34                f"Cannot resume. Use -o to overwrite, or delete the directory."
35            )
36        else:
37            # Output dir + checkpoint → resume
38            population, eggs = init_resume(resume_path, extend_steps=extend_steps)
39    else:
40        init(custom_config_path, overwrite, pickle_path, custom_input_params)
41        population = (
42            Population.initialize(
43                n=parametermanager.parameters.INITIAL_POPULATION_SIZE,
44                AGE_LIMIT=parametermanager.parameters.AGE_LIMIT,
45            )
46            if pickle_path is None
47            else Population.load_pickle_from(pickle_path)
48        )
49        eggs = None
50
51    bioreactor = Bioreactor(population)
52    bioreactor.eggs = eggs
53    sim(bioreactor=bioreactor)
def init( custom_config_path, overwrite=False, pickle_path=None, custom_input_params={}):
56def init(custom_config_path, overwrite=False, pickle_path=None, custom_input_params={}):
57    """
58    When testing aegis, initialize all modules using this function, e.g.
59
60    import aegis_sim
61    aegis_sim.init("_.yml")
62
63    And then you can safely import any module.
64    """
65
66    custom_config_path = pathlib.Path(custom_config_path)
67
68    parametermanager.init(
69        custom_config_path=custom_config_path,
70        custom_input_params=custom_input_params,
71    )
72    variables.init(
73        variables,
74        custom_config_path=custom_config_path,
75        pickle_path=pickle_path,
76        RANDOM_SEED=parametermanager.parameters.RANDOM_SEED,
77    )
78    parameterization.init_traits(parameterization)
79    submodels.init(submodels, parametermanager=parametermanager)
80
81    recordingmanager.init(custom_config_path, overwrite)
82    recordingmanager.initialize_recorders(TICKER_RATE=parametermanager.parameters.TICKER_RATE)

When testing aegis, initialize all modules using this function, e.g.

import aegis_sim aegis_sim.init("_.yml")

And then you can safely import any module.

def init_resume(resume_path, extend_steps=None):
 85def init_resume(resume_path, extend_steps=None):
 86    """Initialize all modules from the latest checkpoint in the given output directory.
 87
 88    Args:
 89        resume_path: Path to the output directory containing the checkpoint.
 90        extend_steps: If set, override STEPS_PER_SIMULATION to extend the run.
 91    """
 92    from aegis_sim.checkpoint import Checkpoint
 93
 94    odir = pathlib.Path(resume_path)
 95    checkpoint_path = Checkpoint.find_latest(odir)
 96    checkpoint = Checkpoint.load(checkpoint_path)
 97
 98    # Restore parameters from checkpoint config
 99    parametermanager.init_from_config(checkpoint.final_config, checkpoint.custom_config_path)
100
101    # Apply --extend override if provided
102    if extend_steps is not None:
103        if extend_steps <= checkpoint.step:
104            raise ValueError(
105                f"--extend {extend_steps} must be greater than checkpoint step {checkpoint.step}"
106            )
107        parametermanager.parameters.STEPS_PER_SIMULATION = extend_steps
108        parametermanager.final_config["STEPS_PER_SIMULATION"] = extend_steps
109        logging.info(f"Extending simulation to {extend_steps} steps (was {checkpoint.final_config['STEPS_PER_SIMULATION']}).")
110
111    # Restore variables (step, RNG state)
112    variables.restore_from_checkpoint(variables, checkpoint)
113
114    # Re-init traits and submodels
115    parameterization.init_traits(parameterization)
116    submodels.init(submodels, parametermanager=parametermanager)
117
118    # Restore envdrift map if it was active
119    if checkpoint.envdrift_map is not None:
120        submodels.architect.envdrift.map = checkpoint.envdrift_map
121
122    # Restore predator population size
123    submodels.predation.N = checkpoint.predator_population_size
124
125    # Restore resource capacity
126    from aegis_sim.submodels.resources.resources import resources
127    resources.capacity = checkpoint.resource_capacity
128
129    # Init recording in append mode (don't overwrite, don't write headers)
130    recordingmanager.init_for_resume(checkpoint.custom_config_path)
131    recordingmanager.initialize_recorders(
132        TICKER_RATE=parametermanager.parameters.TICKER_RATE,
133        resuming=True,
134    )
135
136    # Truncate output files to remove data recorded after the checkpoint step
137    recordingmanager.truncate_for_resume(checkpoint.step)
138
139    # Update TE recorder's file counter after truncation may have deleted files
140    te_dir = recordingmanager.odir / "te"
141    if te_dir.exists():
142        remaining = list(te_dir.glob("*.csv"))
143        recordingmanager.terecorder.TE_number = len(remaining)
144
145    return checkpoint.population, checkpoint.eggs

Initialize all modules from the latest checkpoint in the given output directory.

Args: resume_path: Path to the output directory containing the checkpoint. extend_steps: If set, override STEPS_PER_SIMULATION to extend the run.

def sim(bioreactor):
149def sim(bioreactor):
150    # presim
151    recordingmanager.configrecorder.write_final_config_file(parametermanager.final_config)
152    recordingmanager.ticker.start_process()
153    ticker_pid = recordingmanager.ticker.pid
154    assert ticker_pid is not None
155    recordingmanager.summaryrecorder.write_input_summary(ticker_pid=recordingmanager.ticker.pid)
156    # TODO hacky solution of decrementing and incrementing steps
157    variables.steps -= 1
158    recordingmanager.featherrecorder.write(bioreactor.population)
159    variables.steps += 1
160
161    # sim
162    recordingmanager.phenomaprecorder.write()
163
164    # Write initial checkpoint before the loop so there's always something to resume from
165    if parametermanager.parameters.CHECKPOINT_RATE > 0:
166        from aegis_sim.checkpoint import Checkpoint
167        initial_cp = Checkpoint.capture(bioreactor.population, bioreactor.eggs, variables, submodels, parametermanager)
168        initial_cp.save(recordingmanager.checkpointrecorder.checkpoint_path)
169        logging.debug("Initial checkpoint saved before sim loop.")
170
171    while (variables.steps <= parametermanager.parameters.STEPS_PER_SIMULATION) and not recordingmanager.is_extinct():
172        recordingmanager.progressrecorder.write(len(bioreactor.population))
173        recordingmanager.simpleprogressrecorder.write()
174        bioreactor.run_step()
175        variables.steps += 1
176
177    # postsim
178    recordingmanager.popsizerecorder.flush_all()
179    recordingmanager.resourcerecorder.flush_all()
180    recordingmanager.summaryrecorder.write_output_summary()
181    logging.info("Simulation finished.")
182    recordingmanager.ticker.stop_process()