aegis_sim.recording.recordingmanager
Data recorder
Records data generated by the simulation.
When thinking about recording additional data, consider that there are three recording methods: I. Snapshots (record data from the population at a specific step) II. Flushes (collect data over time then flush) III. One-time records IV. Other: TE records
1"""Data recorder 2 3Records data generated by the simulation. 4 5When thinking about recording additional data, consider that there are three recording methods: 6 I. Snapshots (record data from the population at a specific step) 7 II. Flushes (collect data over time then flush) 8 III. One-time records 9 IV. Other: TE records 10""" 11 12import pathlib 13import shutil 14import logging 15 16from aegis_sim import variables 17 18from .terecorder import TERecorder 19from .picklerecorder import PickleRecorder 20from .popgenstatsrecorder import PopgenStatsRecorder 21from .intervalrecorder import IntervalRecorder 22from .flushrecorder import FlushRecorder 23from .featherrecorder import FeatherRecorder 24from .phenomaprecorder import PhenomapRecorder 25from .summaryrecorder import SummaryRecorder 26from .progressrecorder import ProgressRecorder 27from .simpleprogressrecorder import SimpleProgressRecorder 28from .popsizerecorder import PopsizeRecorder 29from .resourcerecorder import ResourcesRecorder 30from .ticker import Ticker 31from .configrecorder import ConfigRecorder 32from .envdriftmaprecorder import Envdriftmaprecorder 33from .checkpointrecorder import CheckpointRecorder 34from .ancestryrecorder import AncestryRecorder 35 36# TODO write tests 37 38 39class RecordingManager: 40 """ 41 Container class for various recorders. 42 Each recorder records a certain type of data. 43 Most recorders record data as tables, except SummaryRecorder and PickleRecorder which record JSON files and pickles (a binary python format). 44 Headers and indexes of all tabular files are explicitly recorded. 45 46 ----- 47 GUI 48 AEGIS records a lot of different data. 49 In brief, AEGIS records 50 genomic data (population-level allele frequencies and individual-level binary sequences) and 51 phenotypic data (observed population-level phenotypes and intrinsic individual-level phenotypes), 52 as well as 53 derived demographic data (life, death and birth tables), 54 population genetic data (e.g. effective population size, theta), and 55 survival analysis data (TE / time-event tables). 56 Furthermore, it records metadata (e.g. simulation log, processed configuration files) and python pickle files. 57 58 Recorded data is distributed in multiple files. 59 Almost all data are tabular, so each file is a table to which rows are appended as the simulation is running. 60 The recording rates are frequencies at which rows are added; they are expressed in simulation steps. 61 """ 62 63 def init(self, custom_config_path, overwrite): 64 self.odir = self.make_odir(custom_config_path=custom_config_path, overwrite=overwrite) 65 self.resuming = False 66 67 def init_for_resume(self, custom_config_path): 68 """Initialize for resume mode — reuse existing output directory, no overwrite.""" 69 output_path = custom_config_path.parent / custom_config_path.stem 70 if not output_path.exists(): 71 raise FileNotFoundError( 72 f"Cannot resume: output directory {output_path} does not exist." 73 ) 74 self.odir = output_path 75 self.resuming = True 76 77 def truncate_for_resume(self, checkpoint_step): 78 """Truncate output files back to the checkpoint step to avoid duplicate data. 79 80 Called after initialize_recorders so self.odir is set. The checkpoint is 81 saved at the end of run_step for a given step, and all recorders also 82 write during that same run_step. So the output files already contain data 83 for checkpoint_step. On resume the sim loop re-executes from checkpoint_step, 84 so we need to remove data from checkpoint_step onward. 85 86 For per-step files (1 line per step, no header), the number of lines to 87 keep is checkpoint_step - 1 (steps 1 through checkpoint_step-1). 88 89 For rate-based files (1 header line + 1 data line every RATE steps), 90 the number of data lines to keep is (checkpoint_step - 1) // RATE, 91 plus the header line(s). 92 """ 93 from aegis_sim.parameterization import parametermanager 94 95 step = checkpoint_step 96 97 # Per-step files: 1 line per step, no header. Keep step-1 lines. 98 per_step_files = [ 99 "popsize_before_reproduction.csv", 100 "popsize_after_reproduction.csv", 101 "eggnum_after_reproduction.csv", 102 "resources_before_scavenging.csv", 103 "resources_after_scavenging.csv", 104 ] 105 for fname in per_step_files: 106 self._truncate_file(self.odir / fname, keep_lines=step - 1) 107 108 # Rate-based files with 1 header line. 109 # Data is written when step % RATE == 0 or step == 1. 110 # Number of data lines at step S: 1 (for step 1) + count of multiples of RATE in [2, S-1] 111 # Simpler: lines where skip() returns False for steps 1..S-1 112 113 rate_specs_1header = [ 114 ("progress.log", "LOGGING_RATE"), 115 ] 116 for fname, rate_name in rate_specs_1header: 117 rate = getattr(parametermanager.parameters, rate_name) 118 n_data = self._count_recordings(step - 1, rate) 119 self._truncate_file(self.odir / fname, keep_lines=1 + n_data) 120 121 # FlushRecorder spectra: 1 header + data at INTERVAL_RATE 122 spectra_dir = self.odir / "gui" / "spectra" 123 if spectra_dir.exists(): 124 rate = parametermanager.parameters.INTERVAL_RATE 125 n_data = self._count_recordings(step - 1, rate) 126 for csv_file in spectra_dir.glob("*.csv"): 127 self._truncate_file(csv_file, keep_lines=1 + n_data) 128 129 # IntervalRecorder: 2 header lines + data at INTERVAL_RATE 130 rate = parametermanager.parameters.INTERVAL_RATE 131 n_data = self._count_recordings(step - 1, rate) 132 for fname in ["gui/genotypes.csv", "gui/phenotypes.csv"]: 133 self._truncate_file(self.odir / fname, keep_lines=2 + n_data) 134 135 # PopgenStatsRecorder: no header, data at POPGENSTATS_RATE 136 popgen_dir = self.odir / "popgen" 137 if popgen_dir.exists(): 138 rate = parametermanager.parameters.POPGENSTATS_RATE 139 n_data = self._count_recordings(step - 1, rate) 140 for csv_file in popgen_dir.glob("*.csv"): 141 self._truncate_file(csv_file, keep_lines=n_data) 142 143 # Envdriftmap: no header, writes when step % ENVDRIFT_RATE == 0 (NOT at step 1 unless divisible). 144 # This differs from skip() logic, so we count multiples directly. 145 rate = parametermanager.parameters.ENVDRIFT_RATE 146 if rate > 0: 147 n_data = (step - 1) // rate # multiples of rate in [1, step-1] 148 self._truncate_file(self.odir / "envdriftmap.csv", keep_lines=n_data) 149 150 # TE files: numbered CSV files in te/ directory. 151 # A new TE file starts at step 1 and every TE_RATE steps. 152 # TE_number increments when a file is flushed (at TE_DURATION offset or final step). 153 # We need to figure out which TE file was in-progress at the checkpoint step 154 # and delete any files started after it. 155 te_rate = parametermanager.parameters.TE_RATE 156 te_duration = parametermanager.parameters.TE_DURATION 157 if te_rate > 0: 158 te_dir = self.odir / "te" 159 if te_dir.exists(): 160 self._truncate_te_files(te_dir, step, te_rate, te_duration) 161 162 logging.info(f"Output files truncated to checkpoint step {step}.") 163 164 @staticmethod 165 def _truncate_te_files(te_dir, checkpoint_step, te_rate, te_duration): 166 """Handle TE file truncation on resume. 167 168 TE files are numbered 0.csv, 1.csv, etc. A new collection window opens 169 at step 1 and every TE_RATE steps. The file number increments when the 170 window is flushed. We need to: 171 1. Determine how many complete TE windows finished before checkpoint_step 172 2. Delete any TE files beyond that 173 3. Truncate the in-progress TE file (remove data recorded at/after checkpoint_step) 174 """ 175 # Count how many TE windows were fully completed before checkpoint_step. 176 # A window starts at step S where S % te_rate == 0 (or step 1). 177 # It flushes at S + te_duration (or at STEPS_PER_SIMULATION). 178 # Window 0 starts at step 1, flushes at step te_duration (if te_duration < te_rate). 179 # Window i starts at step i*te_rate, flushes at step i*te_rate + te_duration. 180 # A window is "complete" if its flush step < checkpoint_step. 181 182 # Number of complete windows: windows that started AND flushed before checkpoint_step 183 # Window starts: step 1, te_rate, 2*te_rate, 3*te_rate, ... 184 # The window starting at step W flushes at step W + te_duration. 185 # Complete if W + te_duration < checkpoint_step. 186 187 # For simplicity, just count existing TE files and remove those whose 188 # start step >= checkpoint_step. 189 # Window 0 starts at step 1. 190 # Window k starts at step k * te_rate (for k >= 1), or step 1 for k=0. 191 192 existing_files = sorted(te_dir.glob("*.csv"), key=lambda p: int(p.stem)) 193 for f in existing_files: 194 file_num = int(f.stem) 195 # Window file_num starts at: step 1 if file_num==0, else file_num * te_rate 196 window_start = 1 if file_num == 0 else file_num * te_rate 197 if window_start >= checkpoint_step: 198 # This window started at or after checkpoint — delete it 199 f.unlink() 200 # If the window started before checkpoint but may contain data from 201 # steps >= checkpoint_step, we need to truncate those lines. 202 # TE files have: 1 header line ("T,E"), then data lines for each 203 # death event recorded during the window. We can't easily map lines 204 # to steps, so we leave partial windows as-is. The resumed sim will 205 # re-open a new window at the appropriate step, and the old partial 206 # data from the interrupted window is acceptable (it's death events 207 # that actually happened). 208 209 @staticmethod 210 def _count_recordings(up_to_step, rate): 211 """Count how many times a recorder with given rate would have written for steps 1..up_to_step. 212 213 Mirrors the skip() logic: always write at step 1, then write at every step divisible by rate. 214 """ 215 if rate <= 0 or up_to_step < 1: 216 return 0 217 # Step 1 always records 218 count = 1 219 if up_to_step >= 2: 220 # Multiples of rate in [1, up_to_step] = up_to_step // rate 221 # But step 1 is already counted, so subtract 1 if rate divides 1 (only when rate == 1) 222 count += up_to_step // rate 223 if rate == 1: 224 count -= 1 225 return count 226 227 @staticmethod 228 def _truncate_file(path, keep_lines): 229 """Truncate a file to keep only the first `keep_lines` lines.""" 230 if not path.exists(): 231 return 232 with open(path, "rb") as f: 233 lines = f.readlines() 234 if len(lines) <= keep_lines: 235 return # Nothing to truncate 236 with open(path, "wb") as f: 237 f.writelines(lines[:keep_lines]) 238 239 def initialize_recorders(self, TICKER_RATE, resuming=False): 240 self.terecorder = TERecorder(odir=self.odir, resuming=resuming) 241 self.picklerecorder = PickleRecorder(odir=self.odir) 242 self.popgenstatsrecorder = PopgenStatsRecorder(odir=self.odir) 243 self.guirecorder = IntervalRecorder(odir=self.odir, resuming=resuming) 244 self.flushrecorder = FlushRecorder(odir=self.odir, resuming=resuming) 245 self.featherrecorder = FeatherRecorder(odir=self.odir) 246 self.phenomaprecorder = PhenomapRecorder(odir=self.odir) 247 self.summaryrecorder = SummaryRecorder(odir=self.odir) 248 self.progressrecorder = ProgressRecorder(odir=self.odir, resuming=resuming) 249 self.simpleprogressrecorder = SimpleProgressRecorder(odir=self.odir) 250 self.ticker = Ticker(odir=self.odir, TICKER_RATE=TICKER_RATE) 251 self.popsizerecorder = PopsizeRecorder(odir=self.odir) 252 self.resourcerecorder = ResourcesRecorder(odir=self.odir) 253 self.configrecorder = ConfigRecorder(odir=self.odir) 254 self.envdriftmaprecorder = Envdriftmaprecorder(odir=self.odir) 255 self.checkpointrecorder = CheckpointRecorder(odir=self.odir) 256 self.ancestryrecorder = AncestryRecorder(odir=self.odir) 257 258 ############# 259 # UTILITIES # 260 ############# 261 262 @staticmethod 263 def make_odir(custom_config_path, overwrite) -> pathlib.Path: 264 output_path = custom_config_path.parent / custom_config_path.stem # remove .yml 265 is_occupied = output_path.exists() and output_path.is_dir() 266 if is_occupied: 267 if overwrite: 268 shutil.rmtree(output_path) 269 else: 270 raise Exception(f"{output_path} already exists. To overwrite, add flag --overwrite or -o.") 271 return output_path 272 273 @staticmethod 274 def make_subfolders(paths): 275 # TODO relink paths that are now in classes 276 for path in paths: 277 path.mkdir(exist_ok=True, parents=True) 278 279 def is_extinct(self) -> bool: 280 if self.summaryrecorder.extinct: 281 logging.info(f"Population went extinct (at step {variables.steps}).") 282 return True 283 return False
40class RecordingManager: 41 """ 42 Container class for various recorders. 43 Each recorder records a certain type of data. 44 Most recorders record data as tables, except SummaryRecorder and PickleRecorder which record JSON files and pickles (a binary python format). 45 Headers and indexes of all tabular files are explicitly recorded. 46 47 ----- 48 GUI 49 AEGIS records a lot of different data. 50 In brief, AEGIS records 51 genomic data (population-level allele frequencies and individual-level binary sequences) and 52 phenotypic data (observed population-level phenotypes and intrinsic individual-level phenotypes), 53 as well as 54 derived demographic data (life, death and birth tables), 55 population genetic data (e.g. effective population size, theta), and 56 survival analysis data (TE / time-event tables). 57 Furthermore, it records metadata (e.g. simulation log, processed configuration files) and python pickle files. 58 59 Recorded data is distributed in multiple files. 60 Almost all data are tabular, so each file is a table to which rows are appended as the simulation is running. 61 The recording rates are frequencies at which rows are added; they are expressed in simulation steps. 62 """ 63 64 def init(self, custom_config_path, overwrite): 65 self.odir = self.make_odir(custom_config_path=custom_config_path, overwrite=overwrite) 66 self.resuming = False 67 68 def init_for_resume(self, custom_config_path): 69 """Initialize for resume mode — reuse existing output directory, no overwrite.""" 70 output_path = custom_config_path.parent / custom_config_path.stem 71 if not output_path.exists(): 72 raise FileNotFoundError( 73 f"Cannot resume: output directory {output_path} does not exist." 74 ) 75 self.odir = output_path 76 self.resuming = True 77 78 def truncate_for_resume(self, checkpoint_step): 79 """Truncate output files back to the checkpoint step to avoid duplicate data. 80 81 Called after initialize_recorders so self.odir is set. The checkpoint is 82 saved at the end of run_step for a given step, and all recorders also 83 write during that same run_step. So the output files already contain data 84 for checkpoint_step. On resume the sim loop re-executes from checkpoint_step, 85 so we need to remove data from checkpoint_step onward. 86 87 For per-step files (1 line per step, no header), the number of lines to 88 keep is checkpoint_step - 1 (steps 1 through checkpoint_step-1). 89 90 For rate-based files (1 header line + 1 data line every RATE steps), 91 the number of data lines to keep is (checkpoint_step - 1) // RATE, 92 plus the header line(s). 93 """ 94 from aegis_sim.parameterization import parametermanager 95 96 step = checkpoint_step 97 98 # Per-step files: 1 line per step, no header. Keep step-1 lines. 99 per_step_files = [ 100 "popsize_before_reproduction.csv", 101 "popsize_after_reproduction.csv", 102 "eggnum_after_reproduction.csv", 103 "resources_before_scavenging.csv", 104 "resources_after_scavenging.csv", 105 ] 106 for fname in per_step_files: 107 self._truncate_file(self.odir / fname, keep_lines=step - 1) 108 109 # Rate-based files with 1 header line. 110 # Data is written when step % RATE == 0 or step == 1. 111 # Number of data lines at step S: 1 (for step 1) + count of multiples of RATE in [2, S-1] 112 # Simpler: lines where skip() returns False for steps 1..S-1 113 114 rate_specs_1header = [ 115 ("progress.log", "LOGGING_RATE"), 116 ] 117 for fname, rate_name in rate_specs_1header: 118 rate = getattr(parametermanager.parameters, rate_name) 119 n_data = self._count_recordings(step - 1, rate) 120 self._truncate_file(self.odir / fname, keep_lines=1 + n_data) 121 122 # FlushRecorder spectra: 1 header + data at INTERVAL_RATE 123 spectra_dir = self.odir / "gui" / "spectra" 124 if spectra_dir.exists(): 125 rate = parametermanager.parameters.INTERVAL_RATE 126 n_data = self._count_recordings(step - 1, rate) 127 for csv_file in spectra_dir.glob("*.csv"): 128 self._truncate_file(csv_file, keep_lines=1 + n_data) 129 130 # IntervalRecorder: 2 header lines + data at INTERVAL_RATE 131 rate = parametermanager.parameters.INTERVAL_RATE 132 n_data = self._count_recordings(step - 1, rate) 133 for fname in ["gui/genotypes.csv", "gui/phenotypes.csv"]: 134 self._truncate_file(self.odir / fname, keep_lines=2 + n_data) 135 136 # PopgenStatsRecorder: no header, data at POPGENSTATS_RATE 137 popgen_dir = self.odir / "popgen" 138 if popgen_dir.exists(): 139 rate = parametermanager.parameters.POPGENSTATS_RATE 140 n_data = self._count_recordings(step - 1, rate) 141 for csv_file in popgen_dir.glob("*.csv"): 142 self._truncate_file(csv_file, keep_lines=n_data) 143 144 # Envdriftmap: no header, writes when step % ENVDRIFT_RATE == 0 (NOT at step 1 unless divisible). 145 # This differs from skip() logic, so we count multiples directly. 146 rate = parametermanager.parameters.ENVDRIFT_RATE 147 if rate > 0: 148 n_data = (step - 1) // rate # multiples of rate in [1, step-1] 149 self._truncate_file(self.odir / "envdriftmap.csv", keep_lines=n_data) 150 151 # TE files: numbered CSV files in te/ directory. 152 # A new TE file starts at step 1 and every TE_RATE steps. 153 # TE_number increments when a file is flushed (at TE_DURATION offset or final step). 154 # We need to figure out which TE file was in-progress at the checkpoint step 155 # and delete any files started after it. 156 te_rate = parametermanager.parameters.TE_RATE 157 te_duration = parametermanager.parameters.TE_DURATION 158 if te_rate > 0: 159 te_dir = self.odir / "te" 160 if te_dir.exists(): 161 self._truncate_te_files(te_dir, step, te_rate, te_duration) 162 163 logging.info(f"Output files truncated to checkpoint step {step}.") 164 165 @staticmethod 166 def _truncate_te_files(te_dir, checkpoint_step, te_rate, te_duration): 167 """Handle TE file truncation on resume. 168 169 TE files are numbered 0.csv, 1.csv, etc. A new collection window opens 170 at step 1 and every TE_RATE steps. The file number increments when the 171 window is flushed. We need to: 172 1. Determine how many complete TE windows finished before checkpoint_step 173 2. Delete any TE files beyond that 174 3. Truncate the in-progress TE file (remove data recorded at/after checkpoint_step) 175 """ 176 # Count how many TE windows were fully completed before checkpoint_step. 177 # A window starts at step S where S % te_rate == 0 (or step 1). 178 # It flushes at S + te_duration (or at STEPS_PER_SIMULATION). 179 # Window 0 starts at step 1, flushes at step te_duration (if te_duration < te_rate). 180 # Window i starts at step i*te_rate, flushes at step i*te_rate + te_duration. 181 # A window is "complete" if its flush step < checkpoint_step. 182 183 # Number of complete windows: windows that started AND flushed before checkpoint_step 184 # Window starts: step 1, te_rate, 2*te_rate, 3*te_rate, ... 185 # The window starting at step W flushes at step W + te_duration. 186 # Complete if W + te_duration < checkpoint_step. 187 188 # For simplicity, just count existing TE files and remove those whose 189 # start step >= checkpoint_step. 190 # Window 0 starts at step 1. 191 # Window k starts at step k * te_rate (for k >= 1), or step 1 for k=0. 192 193 existing_files = sorted(te_dir.glob("*.csv"), key=lambda p: int(p.stem)) 194 for f in existing_files: 195 file_num = int(f.stem) 196 # Window file_num starts at: step 1 if file_num==0, else file_num * te_rate 197 window_start = 1 if file_num == 0 else file_num * te_rate 198 if window_start >= checkpoint_step: 199 # This window started at or after checkpoint — delete it 200 f.unlink() 201 # If the window started before checkpoint but may contain data from 202 # steps >= checkpoint_step, we need to truncate those lines. 203 # TE files have: 1 header line ("T,E"), then data lines for each 204 # death event recorded during the window. We can't easily map lines 205 # to steps, so we leave partial windows as-is. The resumed sim will 206 # re-open a new window at the appropriate step, and the old partial 207 # data from the interrupted window is acceptable (it's death events 208 # that actually happened). 209 210 @staticmethod 211 def _count_recordings(up_to_step, rate): 212 """Count how many times a recorder with given rate would have written for steps 1..up_to_step. 213 214 Mirrors the skip() logic: always write at step 1, then write at every step divisible by rate. 215 """ 216 if rate <= 0 or up_to_step < 1: 217 return 0 218 # Step 1 always records 219 count = 1 220 if up_to_step >= 2: 221 # Multiples of rate in [1, up_to_step] = up_to_step // rate 222 # But step 1 is already counted, so subtract 1 if rate divides 1 (only when rate == 1) 223 count += up_to_step // rate 224 if rate == 1: 225 count -= 1 226 return count 227 228 @staticmethod 229 def _truncate_file(path, keep_lines): 230 """Truncate a file to keep only the first `keep_lines` lines.""" 231 if not path.exists(): 232 return 233 with open(path, "rb") as f: 234 lines = f.readlines() 235 if len(lines) <= keep_lines: 236 return # Nothing to truncate 237 with open(path, "wb") as f: 238 f.writelines(lines[:keep_lines]) 239 240 def initialize_recorders(self, TICKER_RATE, resuming=False): 241 self.terecorder = TERecorder(odir=self.odir, resuming=resuming) 242 self.picklerecorder = PickleRecorder(odir=self.odir) 243 self.popgenstatsrecorder = PopgenStatsRecorder(odir=self.odir) 244 self.guirecorder = IntervalRecorder(odir=self.odir, resuming=resuming) 245 self.flushrecorder = FlushRecorder(odir=self.odir, resuming=resuming) 246 self.featherrecorder = FeatherRecorder(odir=self.odir) 247 self.phenomaprecorder = PhenomapRecorder(odir=self.odir) 248 self.summaryrecorder = SummaryRecorder(odir=self.odir) 249 self.progressrecorder = ProgressRecorder(odir=self.odir, resuming=resuming) 250 self.simpleprogressrecorder = SimpleProgressRecorder(odir=self.odir) 251 self.ticker = Ticker(odir=self.odir, TICKER_RATE=TICKER_RATE) 252 self.popsizerecorder = PopsizeRecorder(odir=self.odir) 253 self.resourcerecorder = ResourcesRecorder(odir=self.odir) 254 self.configrecorder = ConfigRecorder(odir=self.odir) 255 self.envdriftmaprecorder = Envdriftmaprecorder(odir=self.odir) 256 self.checkpointrecorder = CheckpointRecorder(odir=self.odir) 257 self.ancestryrecorder = AncestryRecorder(odir=self.odir) 258 259 ############# 260 # UTILITIES # 261 ############# 262 263 @staticmethod 264 def make_odir(custom_config_path, overwrite) -> pathlib.Path: 265 output_path = custom_config_path.parent / custom_config_path.stem # remove .yml 266 is_occupied = output_path.exists() and output_path.is_dir() 267 if is_occupied: 268 if overwrite: 269 shutil.rmtree(output_path) 270 else: 271 raise Exception(f"{output_path} already exists. To overwrite, add flag --overwrite or -o.") 272 return output_path 273 274 @staticmethod 275 def make_subfolders(paths): 276 # TODO relink paths that are now in classes 277 for path in paths: 278 path.mkdir(exist_ok=True, parents=True) 279 280 def is_extinct(self) -> bool: 281 if self.summaryrecorder.extinct: 282 logging.info(f"Population went extinct (at step {variables.steps}).") 283 return True 284 return False
Container class for various recorders. Each recorder records a certain type of data. Most recorders record data as tables, except SummaryRecorder and PickleRecorder which record JSON files and pickles (a binary python format). Headers and indexes of all tabular files are explicitly recorded.
GUI AEGIS records a lot of different data. In brief, AEGIS records genomic data (population-level allele frequencies and individual-level binary sequences) and phenotypic data (observed population-level phenotypes and intrinsic individual-level phenotypes), as well as derived demographic data (life, death and birth tables), population genetic data (e.g. effective population size, theta), and survival analysis data (TE / time-event tables). Furthermore, it records metadata (e.g. simulation log, processed configuration files) and python pickle files.
Recorded data is distributed in multiple files. Almost all data are tabular, so each file is a table to which rows are appended as the simulation is running. The recording rates are frequencies at which rows are added; they are expressed in simulation steps.
68 def init_for_resume(self, custom_config_path): 69 """Initialize for resume mode — reuse existing output directory, no overwrite.""" 70 output_path = custom_config_path.parent / custom_config_path.stem 71 if not output_path.exists(): 72 raise FileNotFoundError( 73 f"Cannot resume: output directory {output_path} does not exist." 74 ) 75 self.odir = output_path 76 self.resuming = True
Initialize for resume mode — reuse existing output directory, no overwrite.
78 def truncate_for_resume(self, checkpoint_step): 79 """Truncate output files back to the checkpoint step to avoid duplicate data. 80 81 Called after initialize_recorders so self.odir is set. The checkpoint is 82 saved at the end of run_step for a given step, and all recorders also 83 write during that same run_step. So the output files already contain data 84 for checkpoint_step. On resume the sim loop re-executes from checkpoint_step, 85 so we need to remove data from checkpoint_step onward. 86 87 For per-step files (1 line per step, no header), the number of lines to 88 keep is checkpoint_step - 1 (steps 1 through checkpoint_step-1). 89 90 For rate-based files (1 header line + 1 data line every RATE steps), 91 the number of data lines to keep is (checkpoint_step - 1) // RATE, 92 plus the header line(s). 93 """ 94 from aegis_sim.parameterization import parametermanager 95 96 step = checkpoint_step 97 98 # Per-step files: 1 line per step, no header. Keep step-1 lines. 99 per_step_files = [ 100 "popsize_before_reproduction.csv", 101 "popsize_after_reproduction.csv", 102 "eggnum_after_reproduction.csv", 103 "resources_before_scavenging.csv", 104 "resources_after_scavenging.csv", 105 ] 106 for fname in per_step_files: 107 self._truncate_file(self.odir / fname, keep_lines=step - 1) 108 109 # Rate-based files with 1 header line. 110 # Data is written when step % RATE == 0 or step == 1. 111 # Number of data lines at step S: 1 (for step 1) + count of multiples of RATE in [2, S-1] 112 # Simpler: lines where skip() returns False for steps 1..S-1 113 114 rate_specs_1header = [ 115 ("progress.log", "LOGGING_RATE"), 116 ] 117 for fname, rate_name in rate_specs_1header: 118 rate = getattr(parametermanager.parameters, rate_name) 119 n_data = self._count_recordings(step - 1, rate) 120 self._truncate_file(self.odir / fname, keep_lines=1 + n_data) 121 122 # FlushRecorder spectra: 1 header + data at INTERVAL_RATE 123 spectra_dir = self.odir / "gui" / "spectra" 124 if spectra_dir.exists(): 125 rate = parametermanager.parameters.INTERVAL_RATE 126 n_data = self._count_recordings(step - 1, rate) 127 for csv_file in spectra_dir.glob("*.csv"): 128 self._truncate_file(csv_file, keep_lines=1 + n_data) 129 130 # IntervalRecorder: 2 header lines + data at INTERVAL_RATE 131 rate = parametermanager.parameters.INTERVAL_RATE 132 n_data = self._count_recordings(step - 1, rate) 133 for fname in ["gui/genotypes.csv", "gui/phenotypes.csv"]: 134 self._truncate_file(self.odir / fname, keep_lines=2 + n_data) 135 136 # PopgenStatsRecorder: no header, data at POPGENSTATS_RATE 137 popgen_dir = self.odir / "popgen" 138 if popgen_dir.exists(): 139 rate = parametermanager.parameters.POPGENSTATS_RATE 140 n_data = self._count_recordings(step - 1, rate) 141 for csv_file in popgen_dir.glob("*.csv"): 142 self._truncate_file(csv_file, keep_lines=n_data) 143 144 # Envdriftmap: no header, writes when step % ENVDRIFT_RATE == 0 (NOT at step 1 unless divisible). 145 # This differs from skip() logic, so we count multiples directly. 146 rate = parametermanager.parameters.ENVDRIFT_RATE 147 if rate > 0: 148 n_data = (step - 1) // rate # multiples of rate in [1, step-1] 149 self._truncate_file(self.odir / "envdriftmap.csv", keep_lines=n_data) 150 151 # TE files: numbered CSV files in te/ directory. 152 # A new TE file starts at step 1 and every TE_RATE steps. 153 # TE_number increments when a file is flushed (at TE_DURATION offset or final step). 154 # We need to figure out which TE file was in-progress at the checkpoint step 155 # and delete any files started after it. 156 te_rate = parametermanager.parameters.TE_RATE 157 te_duration = parametermanager.parameters.TE_DURATION 158 if te_rate > 0: 159 te_dir = self.odir / "te" 160 if te_dir.exists(): 161 self._truncate_te_files(te_dir, step, te_rate, te_duration) 162 163 logging.info(f"Output files truncated to checkpoint step {step}.")
Truncate output files back to the checkpoint step to avoid duplicate data.
Called after initialize_recorders so self.odir is set. The checkpoint is saved at the end of run_step for a given step, and all recorders also write during that same run_step. So the output files already contain data for checkpoint_step. On resume the sim loop re-executes from checkpoint_step, so we need to remove data from checkpoint_step onward.
For per-step files (1 line per step, no header), the number of lines to keep is checkpoint_step - 1 (steps 1 through checkpoint_step-1).
For rate-based files (1 header line + 1 data line every RATE steps), the number of data lines to keep is (checkpoint_step - 1) // RATE, plus the header line(s).
240 def initialize_recorders(self, TICKER_RATE, resuming=False): 241 self.terecorder = TERecorder(odir=self.odir, resuming=resuming) 242 self.picklerecorder = PickleRecorder(odir=self.odir) 243 self.popgenstatsrecorder = PopgenStatsRecorder(odir=self.odir) 244 self.guirecorder = IntervalRecorder(odir=self.odir, resuming=resuming) 245 self.flushrecorder = FlushRecorder(odir=self.odir, resuming=resuming) 246 self.featherrecorder = FeatherRecorder(odir=self.odir) 247 self.phenomaprecorder = PhenomapRecorder(odir=self.odir) 248 self.summaryrecorder = SummaryRecorder(odir=self.odir) 249 self.progressrecorder = ProgressRecorder(odir=self.odir, resuming=resuming) 250 self.simpleprogressrecorder = SimpleProgressRecorder(odir=self.odir) 251 self.ticker = Ticker(odir=self.odir, TICKER_RATE=TICKER_RATE) 252 self.popsizerecorder = PopsizeRecorder(odir=self.odir) 253 self.resourcerecorder = ResourcesRecorder(odir=self.odir) 254 self.configrecorder = ConfigRecorder(odir=self.odir) 255 self.envdriftmaprecorder = Envdriftmaprecorder(odir=self.odir) 256 self.checkpointrecorder = CheckpointRecorder(odir=self.odir) 257 self.ancestryrecorder = AncestryRecorder(odir=self.odir)
263 @staticmethod 264 def make_odir(custom_config_path, overwrite) -> pathlib.Path: 265 output_path = custom_config_path.parent / custom_config_path.stem # remove .yml 266 is_occupied = output_path.exists() and output_path.is_dir() 267 if is_occupied: 268 if overwrite: 269 shutil.rmtree(output_path) 270 else: 271 raise Exception(f"{output_path} already exists. To overwrite, add flag --overwrite or -o.") 272 return output_path