aegis_sim.recording.recordingmanager
Data recorder
Records data generated by the simulation.
When thinking about recording additional data, consider that there are three recording methods: I. Snapshots (record data from the population at a specific step) II. Flushes (collect data over time then flush) III. One-time records IV. Other: TE records
1"""Data recorder 2 3Records data generated by the simulation. 4 5When thinking about recording additional data, consider that there are three recording methods: 6 I. Snapshots (record data from the population at a specific step) 7 II. Flushes (collect data over time then flush) 8 III. One-time records 9 IV. Other: TE records 10""" 11 12import pathlib 13import shutil 14import logging 15 16from aegis_sim import variables 17 18from .terecorder import TERecorder 19from .picklerecorder import PickleRecorder 20from .popgenstatsrecorder import PopgenStatsRecorder 21from .intervalrecorder import IntervalRecorder 22from .flushrecorder import FlushRecorder 23from .featherrecorder import FeatherRecorder 24from .phenomaprecorder import PhenomapRecorder 25from .summaryrecorder import SummaryRecorder 26from .progressrecorder import ProgressRecorder 27from .simpleprogressrecorder import SimpleProgressRecorder 28from .popsizerecorder import PopsizeRecorder 29from .resourcerecorder import ResourcesRecorder 30from .ticker import Ticker 31from .configrecorder import ConfigRecorder 32from .envdriftmaprecorder import Envdriftmaprecorder 33from .checkpointrecorder import CheckpointRecorder 34 35# TODO write tests 36 37 38class RecordingManager: 39 """ 40 Container class for various recorders. 41 Each recorder records a certain type of data. 42 Most recorders record data as tables, except SummaryRecorder and PickleRecorder which record JSON files and pickles (a binary python format). 43 Headers and indexes of all tabular files are explicitly recorded. 44 45 ----- 46 GUI 47 AEGIS records a lot of different data. 48 In brief, AEGIS records 49 genomic data (population-level allele frequencies and individual-level binary sequences) and 50 phenotypic data (observed population-level phenotypes and intrinsic individual-level phenotypes), 51 as well as 52 derived demographic data (life, death and birth tables), 53 population genetic data (e.g. effective population size, theta), and 54 survival analysis data (TE / time-event tables). 55 Furthermore, it records metadata (e.g. simulation log, processed configuration files) and python pickle files. 56 57 Recorded data is distributed in multiple files. 58 Almost all data are tabular, so each file is a table to which rows are appended as the simulation is running. 59 The recording rates are frequencies at which rows are added; they are expressed in simulation steps. 60 """ 61 62 def init(self, custom_config_path, overwrite): 63 self.odir = self.make_odir(custom_config_path=custom_config_path, overwrite=overwrite) 64 self.resuming = False 65 66 def init_for_resume(self, custom_config_path): 67 """Initialize for resume mode — reuse existing output directory, no overwrite.""" 68 output_path = custom_config_path.parent / custom_config_path.stem 69 if not output_path.exists(): 70 raise FileNotFoundError( 71 f"Cannot resume: output directory {output_path} does not exist." 72 ) 73 self.odir = output_path 74 self.resuming = True 75 76 def truncate_for_resume(self, checkpoint_step): 77 """Truncate output files back to the checkpoint step to avoid duplicate data. 78 79 Called after initialize_recorders so self.odir is set. The checkpoint is 80 saved at the end of run_step for a given step, and all recorders also 81 write during that same run_step. So the output files already contain data 82 for checkpoint_step. On resume the sim loop re-executes from checkpoint_step, 83 so we need to remove data from checkpoint_step onward. 84 85 For per-step files (1 line per step, no header), the number of lines to 86 keep is checkpoint_step - 1 (steps 1 through checkpoint_step-1). 87 88 For rate-based files (1 header line + 1 data line every RATE steps), 89 the number of data lines to keep is (checkpoint_step - 1) // RATE, 90 plus the header line(s). 91 """ 92 from aegis_sim.parameterization import parametermanager 93 94 step = checkpoint_step 95 96 # Per-step files: 1 line per step, no header. Keep step-1 lines. 97 per_step_files = [ 98 "popsize_before_reproduction.csv", 99 "popsize_after_reproduction.csv", 100 "eggnum_after_reproduction.csv", 101 "resources_before_scavenging.csv", 102 "resources_after_scavenging.csv", 103 ] 104 for fname in per_step_files: 105 self._truncate_file(self.odir / fname, keep_lines=step - 1) 106 107 # Rate-based files with 1 header line. 108 # Data is written when step % RATE == 0 or step == 1. 109 # Number of data lines at step S: 1 (for step 1) + count of multiples of RATE in [2, S-1] 110 # Simpler: lines where skip() returns False for steps 1..S-1 111 112 rate_specs_1header = [ 113 ("progress.log", "LOGGING_RATE"), 114 ] 115 for fname, rate_name in rate_specs_1header: 116 rate = getattr(parametermanager.parameters, rate_name) 117 n_data = self._count_recordings(step - 1, rate) 118 self._truncate_file(self.odir / fname, keep_lines=1 + n_data) 119 120 # FlushRecorder spectra: 1 header + data at INTERVAL_RATE 121 spectra_dir = self.odir / "gui" / "spectra" 122 if spectra_dir.exists(): 123 rate = parametermanager.parameters.INTERVAL_RATE 124 n_data = self._count_recordings(step - 1, rate) 125 for csv_file in spectra_dir.glob("*.csv"): 126 self._truncate_file(csv_file, keep_lines=1 + n_data) 127 128 # IntervalRecorder: 2 header lines + data at INTERVAL_RATE 129 rate = parametermanager.parameters.INTERVAL_RATE 130 n_data = self._count_recordings(step - 1, rate) 131 for fname in ["gui/genotypes.csv", "gui/phenotypes.csv"]: 132 self._truncate_file(self.odir / fname, keep_lines=2 + n_data) 133 134 # PopgenStatsRecorder: no header, data at POPGENSTATS_RATE 135 popgen_dir = self.odir / "popgen" 136 if popgen_dir.exists(): 137 rate = parametermanager.parameters.POPGENSTATS_RATE 138 n_data = self._count_recordings(step - 1, rate) 139 for csv_file in popgen_dir.glob("*.csv"): 140 self._truncate_file(csv_file, keep_lines=n_data) 141 142 # Envdriftmap: no header, writes when step % ENVDRIFT_RATE == 0 (NOT at step 1 unless divisible). 143 # This differs from skip() logic, so we count multiples directly. 144 rate = parametermanager.parameters.ENVDRIFT_RATE 145 if rate > 0: 146 n_data = (step - 1) // rate # multiples of rate in [1, step-1] 147 self._truncate_file(self.odir / "envdriftmap.csv", keep_lines=n_data) 148 149 # TE files: numbered CSV files in te/ directory. 150 # A new TE file starts at step 1 and every TE_RATE steps. 151 # TE_number increments when a file is flushed (at TE_DURATION offset or final step). 152 # We need to figure out which TE file was in-progress at the checkpoint step 153 # and delete any files started after it. 154 te_rate = parametermanager.parameters.TE_RATE 155 te_duration = parametermanager.parameters.TE_DURATION 156 if te_rate > 0: 157 te_dir = self.odir / "te" 158 if te_dir.exists(): 159 self._truncate_te_files(te_dir, step, te_rate, te_duration) 160 161 logging.info(f"Output files truncated to checkpoint step {step}.") 162 163 @staticmethod 164 def _truncate_te_files(te_dir, checkpoint_step, te_rate, te_duration): 165 """Handle TE file truncation on resume. 166 167 TE files are numbered 0.csv, 1.csv, etc. A new collection window opens 168 at step 1 and every TE_RATE steps. The file number increments when the 169 window is flushed. We need to: 170 1. Determine how many complete TE windows finished before checkpoint_step 171 2. Delete any TE files beyond that 172 3. Truncate the in-progress TE file (remove data recorded at/after checkpoint_step) 173 """ 174 # Count how many TE windows were fully completed before checkpoint_step. 175 # A window starts at step S where S % te_rate == 0 (or step 1). 176 # It flushes at S + te_duration (or at STEPS_PER_SIMULATION). 177 # Window 0 starts at step 1, flushes at step te_duration (if te_duration < te_rate). 178 # Window i starts at step i*te_rate, flushes at step i*te_rate + te_duration. 179 # A window is "complete" if its flush step < checkpoint_step. 180 181 # Number of complete windows: windows that started AND flushed before checkpoint_step 182 # Window starts: step 1, te_rate, 2*te_rate, 3*te_rate, ... 183 # The window starting at step W flushes at step W + te_duration. 184 # Complete if W + te_duration < checkpoint_step. 185 186 # For simplicity, just count existing TE files and remove those whose 187 # start step >= checkpoint_step. 188 # Window 0 starts at step 1. 189 # Window k starts at step k * te_rate (for k >= 1), or step 1 for k=0. 190 191 existing_files = sorted(te_dir.glob("*.csv"), key=lambda p: int(p.stem)) 192 for f in existing_files: 193 file_num = int(f.stem) 194 # Window file_num starts at: step 1 if file_num==0, else file_num * te_rate 195 window_start = 1 if file_num == 0 else file_num * te_rate 196 if window_start >= checkpoint_step: 197 # This window started at or after checkpoint — delete it 198 f.unlink() 199 # If the window started before checkpoint but may contain data from 200 # steps >= checkpoint_step, we need to truncate those lines. 201 # TE files have: 1 header line ("T,E"), then data lines for each 202 # death event recorded during the window. We can't easily map lines 203 # to steps, so we leave partial windows as-is. The resumed sim will 204 # re-open a new window at the appropriate step, and the old partial 205 # data from the interrupted window is acceptable (it's death events 206 # that actually happened). 207 208 @staticmethod 209 def _count_recordings(up_to_step, rate): 210 """Count how many times a recorder with given rate would have written for steps 1..up_to_step. 211 212 Mirrors the skip() logic: always write at step 1, then write at every step divisible by rate. 213 """ 214 if rate <= 0 or up_to_step < 1: 215 return 0 216 # Step 1 always records 217 count = 1 218 if up_to_step >= 2: 219 # Multiples of rate in [1, up_to_step] = up_to_step // rate 220 # But step 1 is already counted, so subtract 1 if rate divides 1 (only when rate == 1) 221 count += up_to_step // rate 222 if rate == 1: 223 count -= 1 224 return count 225 226 @staticmethod 227 def _truncate_file(path, keep_lines): 228 """Truncate a file to keep only the first `keep_lines` lines.""" 229 if not path.exists(): 230 return 231 with open(path, "rb") as f: 232 lines = f.readlines() 233 if len(lines) <= keep_lines: 234 return # Nothing to truncate 235 with open(path, "wb") as f: 236 f.writelines(lines[:keep_lines]) 237 238 def initialize_recorders(self, TICKER_RATE, resuming=False): 239 self.terecorder = TERecorder(odir=self.odir, resuming=resuming) 240 self.picklerecorder = PickleRecorder(odir=self.odir) 241 self.popgenstatsrecorder = PopgenStatsRecorder(odir=self.odir) 242 self.guirecorder = IntervalRecorder(odir=self.odir, resuming=resuming) 243 self.flushrecorder = FlushRecorder(odir=self.odir, resuming=resuming) 244 self.featherrecorder = FeatherRecorder(odir=self.odir) 245 self.phenomaprecorder = PhenomapRecorder(odir=self.odir) 246 self.summaryrecorder = SummaryRecorder(odir=self.odir) 247 self.progressrecorder = ProgressRecorder(odir=self.odir, resuming=resuming) 248 self.simpleprogressrecorder = SimpleProgressRecorder(odir=self.odir) 249 self.ticker = Ticker(odir=self.odir, TICKER_RATE=TICKER_RATE) 250 self.popsizerecorder = PopsizeRecorder(odir=self.odir) 251 self.resourcerecorder = ResourcesRecorder(odir=self.odir) 252 self.configrecorder = ConfigRecorder(odir=self.odir) 253 self.envdriftmaprecorder = Envdriftmaprecorder(odir=self.odir) 254 self.checkpointrecorder = CheckpointRecorder(odir=self.odir) 255 256 ############# 257 # UTILITIES # 258 ############# 259 260 @staticmethod 261 def make_odir(custom_config_path, overwrite) -> pathlib.Path: 262 output_path = custom_config_path.parent / custom_config_path.stem # remove .yml 263 is_occupied = output_path.exists() and output_path.is_dir() 264 if is_occupied: 265 if overwrite: 266 shutil.rmtree(output_path) 267 else: 268 raise Exception(f"{output_path} already exists. To overwrite, add flag --overwrite or -o.") 269 return output_path 270 271 @staticmethod 272 def make_subfolders(paths): 273 # TODO relink paths that are now in classes 274 for path in paths: 275 path.mkdir(exist_ok=True, parents=True) 276 277 def is_extinct(self) -> bool: 278 if self.summaryrecorder.extinct: 279 logging.info(f"Population went extinct (at step {variables.steps}).") 280 return True 281 return False
39class RecordingManager: 40 """ 41 Container class for various recorders. 42 Each recorder records a certain type of data. 43 Most recorders record data as tables, except SummaryRecorder and PickleRecorder which record JSON files and pickles (a binary python format). 44 Headers and indexes of all tabular files are explicitly recorded. 45 46 ----- 47 GUI 48 AEGIS records a lot of different data. 49 In brief, AEGIS records 50 genomic data (population-level allele frequencies and individual-level binary sequences) and 51 phenotypic data (observed population-level phenotypes and intrinsic individual-level phenotypes), 52 as well as 53 derived demographic data (life, death and birth tables), 54 population genetic data (e.g. effective population size, theta), and 55 survival analysis data (TE / time-event tables). 56 Furthermore, it records metadata (e.g. simulation log, processed configuration files) and python pickle files. 57 58 Recorded data is distributed in multiple files. 59 Almost all data are tabular, so each file is a table to which rows are appended as the simulation is running. 60 The recording rates are frequencies at which rows are added; they are expressed in simulation steps. 61 """ 62 63 def init(self, custom_config_path, overwrite): 64 self.odir = self.make_odir(custom_config_path=custom_config_path, overwrite=overwrite) 65 self.resuming = False 66 67 def init_for_resume(self, custom_config_path): 68 """Initialize for resume mode — reuse existing output directory, no overwrite.""" 69 output_path = custom_config_path.parent / custom_config_path.stem 70 if not output_path.exists(): 71 raise FileNotFoundError( 72 f"Cannot resume: output directory {output_path} does not exist." 73 ) 74 self.odir = output_path 75 self.resuming = True 76 77 def truncate_for_resume(self, checkpoint_step): 78 """Truncate output files back to the checkpoint step to avoid duplicate data. 79 80 Called after initialize_recorders so self.odir is set. The checkpoint is 81 saved at the end of run_step for a given step, and all recorders also 82 write during that same run_step. So the output files already contain data 83 for checkpoint_step. On resume the sim loop re-executes from checkpoint_step, 84 so we need to remove data from checkpoint_step onward. 85 86 For per-step files (1 line per step, no header), the number of lines to 87 keep is checkpoint_step - 1 (steps 1 through checkpoint_step-1). 88 89 For rate-based files (1 header line + 1 data line every RATE steps), 90 the number of data lines to keep is (checkpoint_step - 1) // RATE, 91 plus the header line(s). 92 """ 93 from aegis_sim.parameterization import parametermanager 94 95 step = checkpoint_step 96 97 # Per-step files: 1 line per step, no header. Keep step-1 lines. 98 per_step_files = [ 99 "popsize_before_reproduction.csv", 100 "popsize_after_reproduction.csv", 101 "eggnum_after_reproduction.csv", 102 "resources_before_scavenging.csv", 103 "resources_after_scavenging.csv", 104 ] 105 for fname in per_step_files: 106 self._truncate_file(self.odir / fname, keep_lines=step - 1) 107 108 # Rate-based files with 1 header line. 109 # Data is written when step % RATE == 0 or step == 1. 110 # Number of data lines at step S: 1 (for step 1) + count of multiples of RATE in [2, S-1] 111 # Simpler: lines where skip() returns False for steps 1..S-1 112 113 rate_specs_1header = [ 114 ("progress.log", "LOGGING_RATE"), 115 ] 116 for fname, rate_name in rate_specs_1header: 117 rate = getattr(parametermanager.parameters, rate_name) 118 n_data = self._count_recordings(step - 1, rate) 119 self._truncate_file(self.odir / fname, keep_lines=1 + n_data) 120 121 # FlushRecorder spectra: 1 header + data at INTERVAL_RATE 122 spectra_dir = self.odir / "gui" / "spectra" 123 if spectra_dir.exists(): 124 rate = parametermanager.parameters.INTERVAL_RATE 125 n_data = self._count_recordings(step - 1, rate) 126 for csv_file in spectra_dir.glob("*.csv"): 127 self._truncate_file(csv_file, keep_lines=1 + n_data) 128 129 # IntervalRecorder: 2 header lines + data at INTERVAL_RATE 130 rate = parametermanager.parameters.INTERVAL_RATE 131 n_data = self._count_recordings(step - 1, rate) 132 for fname in ["gui/genotypes.csv", "gui/phenotypes.csv"]: 133 self._truncate_file(self.odir / fname, keep_lines=2 + n_data) 134 135 # PopgenStatsRecorder: no header, data at POPGENSTATS_RATE 136 popgen_dir = self.odir / "popgen" 137 if popgen_dir.exists(): 138 rate = parametermanager.parameters.POPGENSTATS_RATE 139 n_data = self._count_recordings(step - 1, rate) 140 for csv_file in popgen_dir.glob("*.csv"): 141 self._truncate_file(csv_file, keep_lines=n_data) 142 143 # Envdriftmap: no header, writes when step % ENVDRIFT_RATE == 0 (NOT at step 1 unless divisible). 144 # This differs from skip() logic, so we count multiples directly. 145 rate = parametermanager.parameters.ENVDRIFT_RATE 146 if rate > 0: 147 n_data = (step - 1) // rate # multiples of rate in [1, step-1] 148 self._truncate_file(self.odir / "envdriftmap.csv", keep_lines=n_data) 149 150 # TE files: numbered CSV files in te/ directory. 151 # A new TE file starts at step 1 and every TE_RATE steps. 152 # TE_number increments when a file is flushed (at TE_DURATION offset or final step). 153 # We need to figure out which TE file was in-progress at the checkpoint step 154 # and delete any files started after it. 155 te_rate = parametermanager.parameters.TE_RATE 156 te_duration = parametermanager.parameters.TE_DURATION 157 if te_rate > 0: 158 te_dir = self.odir / "te" 159 if te_dir.exists(): 160 self._truncate_te_files(te_dir, step, te_rate, te_duration) 161 162 logging.info(f"Output files truncated to checkpoint step {step}.") 163 164 @staticmethod 165 def _truncate_te_files(te_dir, checkpoint_step, te_rate, te_duration): 166 """Handle TE file truncation on resume. 167 168 TE files are numbered 0.csv, 1.csv, etc. A new collection window opens 169 at step 1 and every TE_RATE steps. The file number increments when the 170 window is flushed. We need to: 171 1. Determine how many complete TE windows finished before checkpoint_step 172 2. Delete any TE files beyond that 173 3. Truncate the in-progress TE file (remove data recorded at/after checkpoint_step) 174 """ 175 # Count how many TE windows were fully completed before checkpoint_step. 176 # A window starts at step S where S % te_rate == 0 (or step 1). 177 # It flushes at S + te_duration (or at STEPS_PER_SIMULATION). 178 # Window 0 starts at step 1, flushes at step te_duration (if te_duration < te_rate). 179 # Window i starts at step i*te_rate, flushes at step i*te_rate + te_duration. 180 # A window is "complete" if its flush step < checkpoint_step. 181 182 # Number of complete windows: windows that started AND flushed before checkpoint_step 183 # Window starts: step 1, te_rate, 2*te_rate, 3*te_rate, ... 184 # The window starting at step W flushes at step W + te_duration. 185 # Complete if W + te_duration < checkpoint_step. 186 187 # For simplicity, just count existing TE files and remove those whose 188 # start step >= checkpoint_step. 189 # Window 0 starts at step 1. 190 # Window k starts at step k * te_rate (for k >= 1), or step 1 for k=0. 191 192 existing_files = sorted(te_dir.glob("*.csv"), key=lambda p: int(p.stem)) 193 for f in existing_files: 194 file_num = int(f.stem) 195 # Window file_num starts at: step 1 if file_num==0, else file_num * te_rate 196 window_start = 1 if file_num == 0 else file_num * te_rate 197 if window_start >= checkpoint_step: 198 # This window started at or after checkpoint — delete it 199 f.unlink() 200 # If the window started before checkpoint but may contain data from 201 # steps >= checkpoint_step, we need to truncate those lines. 202 # TE files have: 1 header line ("T,E"), then data lines for each 203 # death event recorded during the window. We can't easily map lines 204 # to steps, so we leave partial windows as-is. The resumed sim will 205 # re-open a new window at the appropriate step, and the old partial 206 # data from the interrupted window is acceptable (it's death events 207 # that actually happened). 208 209 @staticmethod 210 def _count_recordings(up_to_step, rate): 211 """Count how many times a recorder with given rate would have written for steps 1..up_to_step. 212 213 Mirrors the skip() logic: always write at step 1, then write at every step divisible by rate. 214 """ 215 if rate <= 0 or up_to_step < 1: 216 return 0 217 # Step 1 always records 218 count = 1 219 if up_to_step >= 2: 220 # Multiples of rate in [1, up_to_step] = up_to_step // rate 221 # But step 1 is already counted, so subtract 1 if rate divides 1 (only when rate == 1) 222 count += up_to_step // rate 223 if rate == 1: 224 count -= 1 225 return count 226 227 @staticmethod 228 def _truncate_file(path, keep_lines): 229 """Truncate a file to keep only the first `keep_lines` lines.""" 230 if not path.exists(): 231 return 232 with open(path, "rb") as f: 233 lines = f.readlines() 234 if len(lines) <= keep_lines: 235 return # Nothing to truncate 236 with open(path, "wb") as f: 237 f.writelines(lines[:keep_lines]) 238 239 def initialize_recorders(self, TICKER_RATE, resuming=False): 240 self.terecorder = TERecorder(odir=self.odir, resuming=resuming) 241 self.picklerecorder = PickleRecorder(odir=self.odir) 242 self.popgenstatsrecorder = PopgenStatsRecorder(odir=self.odir) 243 self.guirecorder = IntervalRecorder(odir=self.odir, resuming=resuming) 244 self.flushrecorder = FlushRecorder(odir=self.odir, resuming=resuming) 245 self.featherrecorder = FeatherRecorder(odir=self.odir) 246 self.phenomaprecorder = PhenomapRecorder(odir=self.odir) 247 self.summaryrecorder = SummaryRecorder(odir=self.odir) 248 self.progressrecorder = ProgressRecorder(odir=self.odir, resuming=resuming) 249 self.simpleprogressrecorder = SimpleProgressRecorder(odir=self.odir) 250 self.ticker = Ticker(odir=self.odir, TICKER_RATE=TICKER_RATE) 251 self.popsizerecorder = PopsizeRecorder(odir=self.odir) 252 self.resourcerecorder = ResourcesRecorder(odir=self.odir) 253 self.configrecorder = ConfigRecorder(odir=self.odir) 254 self.envdriftmaprecorder = Envdriftmaprecorder(odir=self.odir) 255 self.checkpointrecorder = CheckpointRecorder(odir=self.odir) 256 257 ############# 258 # UTILITIES # 259 ############# 260 261 @staticmethod 262 def make_odir(custom_config_path, overwrite) -> pathlib.Path: 263 output_path = custom_config_path.parent / custom_config_path.stem # remove .yml 264 is_occupied = output_path.exists() and output_path.is_dir() 265 if is_occupied: 266 if overwrite: 267 shutil.rmtree(output_path) 268 else: 269 raise Exception(f"{output_path} already exists. To overwrite, add flag --overwrite or -o.") 270 return output_path 271 272 @staticmethod 273 def make_subfolders(paths): 274 # TODO relink paths that are now in classes 275 for path in paths: 276 path.mkdir(exist_ok=True, parents=True) 277 278 def is_extinct(self) -> bool: 279 if self.summaryrecorder.extinct: 280 logging.info(f"Population went extinct (at step {variables.steps}).") 281 return True 282 return False
Container class for various recorders. Each recorder records a certain type of data. Most recorders record data as tables, except SummaryRecorder and PickleRecorder which record JSON files and pickles (a binary python format). Headers and indexes of all tabular files are explicitly recorded.
GUI AEGIS records a lot of different data. In brief, AEGIS records genomic data (population-level allele frequencies and individual-level binary sequences) and phenotypic data (observed population-level phenotypes and intrinsic individual-level phenotypes), as well as derived demographic data (life, death and birth tables), population genetic data (e.g. effective population size, theta), and survival analysis data (TE / time-event tables). Furthermore, it records metadata (e.g. simulation log, processed configuration files) and python pickle files.
Recorded data is distributed in multiple files. Almost all data are tabular, so each file is a table to which rows are appended as the simulation is running. The recording rates are frequencies at which rows are added; they are expressed in simulation steps.
67 def init_for_resume(self, custom_config_path): 68 """Initialize for resume mode — reuse existing output directory, no overwrite.""" 69 output_path = custom_config_path.parent / custom_config_path.stem 70 if not output_path.exists(): 71 raise FileNotFoundError( 72 f"Cannot resume: output directory {output_path} does not exist." 73 ) 74 self.odir = output_path 75 self.resuming = True
Initialize for resume mode — reuse existing output directory, no overwrite.
77 def truncate_for_resume(self, checkpoint_step): 78 """Truncate output files back to the checkpoint step to avoid duplicate data. 79 80 Called after initialize_recorders so self.odir is set. The checkpoint is 81 saved at the end of run_step for a given step, and all recorders also 82 write during that same run_step. So the output files already contain data 83 for checkpoint_step. On resume the sim loop re-executes from checkpoint_step, 84 so we need to remove data from checkpoint_step onward. 85 86 For per-step files (1 line per step, no header), the number of lines to 87 keep is checkpoint_step - 1 (steps 1 through checkpoint_step-1). 88 89 For rate-based files (1 header line + 1 data line every RATE steps), 90 the number of data lines to keep is (checkpoint_step - 1) // RATE, 91 plus the header line(s). 92 """ 93 from aegis_sim.parameterization import parametermanager 94 95 step = checkpoint_step 96 97 # Per-step files: 1 line per step, no header. Keep step-1 lines. 98 per_step_files = [ 99 "popsize_before_reproduction.csv", 100 "popsize_after_reproduction.csv", 101 "eggnum_after_reproduction.csv", 102 "resources_before_scavenging.csv", 103 "resources_after_scavenging.csv", 104 ] 105 for fname in per_step_files: 106 self._truncate_file(self.odir / fname, keep_lines=step - 1) 107 108 # Rate-based files with 1 header line. 109 # Data is written when step % RATE == 0 or step == 1. 110 # Number of data lines at step S: 1 (for step 1) + count of multiples of RATE in [2, S-1] 111 # Simpler: lines where skip() returns False for steps 1..S-1 112 113 rate_specs_1header = [ 114 ("progress.log", "LOGGING_RATE"), 115 ] 116 for fname, rate_name in rate_specs_1header: 117 rate = getattr(parametermanager.parameters, rate_name) 118 n_data = self._count_recordings(step - 1, rate) 119 self._truncate_file(self.odir / fname, keep_lines=1 + n_data) 120 121 # FlushRecorder spectra: 1 header + data at INTERVAL_RATE 122 spectra_dir = self.odir / "gui" / "spectra" 123 if spectra_dir.exists(): 124 rate = parametermanager.parameters.INTERVAL_RATE 125 n_data = self._count_recordings(step - 1, rate) 126 for csv_file in spectra_dir.glob("*.csv"): 127 self._truncate_file(csv_file, keep_lines=1 + n_data) 128 129 # IntervalRecorder: 2 header lines + data at INTERVAL_RATE 130 rate = parametermanager.parameters.INTERVAL_RATE 131 n_data = self._count_recordings(step - 1, rate) 132 for fname in ["gui/genotypes.csv", "gui/phenotypes.csv"]: 133 self._truncate_file(self.odir / fname, keep_lines=2 + n_data) 134 135 # PopgenStatsRecorder: no header, data at POPGENSTATS_RATE 136 popgen_dir = self.odir / "popgen" 137 if popgen_dir.exists(): 138 rate = parametermanager.parameters.POPGENSTATS_RATE 139 n_data = self._count_recordings(step - 1, rate) 140 for csv_file in popgen_dir.glob("*.csv"): 141 self._truncate_file(csv_file, keep_lines=n_data) 142 143 # Envdriftmap: no header, writes when step % ENVDRIFT_RATE == 0 (NOT at step 1 unless divisible). 144 # This differs from skip() logic, so we count multiples directly. 145 rate = parametermanager.parameters.ENVDRIFT_RATE 146 if rate > 0: 147 n_data = (step - 1) // rate # multiples of rate in [1, step-1] 148 self._truncate_file(self.odir / "envdriftmap.csv", keep_lines=n_data) 149 150 # TE files: numbered CSV files in te/ directory. 151 # A new TE file starts at step 1 and every TE_RATE steps. 152 # TE_number increments when a file is flushed (at TE_DURATION offset or final step). 153 # We need to figure out which TE file was in-progress at the checkpoint step 154 # and delete any files started after it. 155 te_rate = parametermanager.parameters.TE_RATE 156 te_duration = parametermanager.parameters.TE_DURATION 157 if te_rate > 0: 158 te_dir = self.odir / "te" 159 if te_dir.exists(): 160 self._truncate_te_files(te_dir, step, te_rate, te_duration) 161 162 logging.info(f"Output files truncated to checkpoint step {step}.")
Truncate output files back to the checkpoint step to avoid duplicate data.
Called after initialize_recorders so self.odir is set. The checkpoint is saved at the end of run_step for a given step, and all recorders also write during that same run_step. So the output files already contain data for checkpoint_step. On resume the sim loop re-executes from checkpoint_step, so we need to remove data from checkpoint_step onward.
For per-step files (1 line per step, no header), the number of lines to keep is checkpoint_step - 1 (steps 1 through checkpoint_step-1).
For rate-based files (1 header line + 1 data line every RATE steps), the number of data lines to keep is (checkpoint_step - 1) // RATE, plus the header line(s).
239 def initialize_recorders(self, TICKER_RATE, resuming=False): 240 self.terecorder = TERecorder(odir=self.odir, resuming=resuming) 241 self.picklerecorder = PickleRecorder(odir=self.odir) 242 self.popgenstatsrecorder = PopgenStatsRecorder(odir=self.odir) 243 self.guirecorder = IntervalRecorder(odir=self.odir, resuming=resuming) 244 self.flushrecorder = FlushRecorder(odir=self.odir, resuming=resuming) 245 self.featherrecorder = FeatherRecorder(odir=self.odir) 246 self.phenomaprecorder = PhenomapRecorder(odir=self.odir) 247 self.summaryrecorder = SummaryRecorder(odir=self.odir) 248 self.progressrecorder = ProgressRecorder(odir=self.odir, resuming=resuming) 249 self.simpleprogressrecorder = SimpleProgressRecorder(odir=self.odir) 250 self.ticker = Ticker(odir=self.odir, TICKER_RATE=TICKER_RATE) 251 self.popsizerecorder = PopsizeRecorder(odir=self.odir) 252 self.resourcerecorder = ResourcesRecorder(odir=self.odir) 253 self.configrecorder = ConfigRecorder(odir=self.odir) 254 self.envdriftmaprecorder = Envdriftmaprecorder(odir=self.odir) 255 self.checkpointrecorder = CheckpointRecorder(odir=self.odir)
261 @staticmethod 262 def make_odir(custom_config_path, overwrite) -> pathlib.Path: 263 output_path = custom_config_path.parent / custom_config_path.stem # remove .yml 264 is_occupied = output_path.exists() and output_path.is_dir() 265 if is_occupied: 266 if overwrite: 267 shutil.rmtree(output_path) 268 else: 269 raise Exception(f"{output_path} already exists. To overwrite, add flag --overwrite or -o.") 270 return output_path