aegis_sim.recording.recordingmanager

Data recorder

Records data generated by the simulation.

When thinking about recording additional data, consider that there are three recording methods: I. Snapshots (record data from the population at a specific step) II. Flushes (collect data over time then flush) III. One-time records IV. Other: TE records

  1"""Data recorder
  2
  3Records data generated by the simulation.
  4
  5When thinking about recording additional data, consider that there are three recording methods:
  6    I. Snapshots (record data from the population at a specific step)
  7    II. Flushes (collect data over time then flush)
  8    III. One-time records
  9    IV. Other: TE records
 10"""
 11
 12import pathlib
 13import shutil
 14import logging
 15
 16from aegis_sim import variables
 17
 18from .terecorder import TERecorder
 19from .picklerecorder import PickleRecorder
 20from .popgenstatsrecorder import PopgenStatsRecorder
 21from .intervalrecorder import IntervalRecorder
 22from .flushrecorder import FlushRecorder
 23from .featherrecorder import FeatherRecorder
 24from .phenomaprecorder import PhenomapRecorder
 25from .summaryrecorder import SummaryRecorder
 26from .progressrecorder import ProgressRecorder
 27from .simpleprogressrecorder import SimpleProgressRecorder
 28from .popsizerecorder import PopsizeRecorder
 29from .resourcerecorder import ResourcesRecorder
 30from .ticker import Ticker
 31from .configrecorder import ConfigRecorder
 32from .envdriftmaprecorder import Envdriftmaprecorder
 33from .checkpointrecorder import CheckpointRecorder
 34
 35# TODO write tests
 36
 37
 38class RecordingManager:
 39    """
 40    Container class for various recorders.
 41    Each recorder records a certain type of data.
 42    Most recorders record data as tables, except SummaryRecorder and PickleRecorder which record JSON files and pickles (a binary python format).
 43    Headers and indexes of all tabular files are explicitly recorded.
 44
 45    -----
 46    GUI
 47    AEGIS records a lot of different data.
 48    In brief, AEGIS records
 49    genomic data (population-level allele frequencies and individual-level binary sequences) and
 50    phenotypic data (observed population-level phenotypes and intrinsic individual-level phenotypes),
 51    as well as
 52    derived demographic data (life, death and birth tables),
 53    population genetic data (e.g. effective population size, theta), and
 54    survival analysis data (TE / time-event tables).
 55    Furthermore, it records metadata (e.g. simulation log, processed configuration files) and python pickle files.
 56
 57    Recorded data is distributed in multiple files.
 58    Almost all data are tabular, so each file is a table to which rows are appended as the simulation is running.
 59    The recording rates are frequencies at which rows are added; they are expressed in simulation steps.
 60    """
 61
 62    def init(self, custom_config_path, overwrite):
 63        self.odir = self.make_odir(custom_config_path=custom_config_path, overwrite=overwrite)
 64        self.resuming = False
 65
 66    def init_for_resume(self, custom_config_path):
 67        """Initialize for resume mode — reuse existing output directory, no overwrite."""
 68        output_path = custom_config_path.parent / custom_config_path.stem
 69        if not output_path.exists():
 70            raise FileNotFoundError(
 71                f"Cannot resume: output directory {output_path} does not exist."
 72            )
 73        self.odir = output_path
 74        self.resuming = True
 75
 76    def truncate_for_resume(self, checkpoint_step):
 77        """Truncate output files back to the checkpoint step to avoid duplicate data.
 78
 79        Called after initialize_recorders so self.odir is set. The checkpoint is
 80        saved at the end of run_step for a given step, and all recorders also
 81        write during that same run_step. So the output files already contain data
 82        for checkpoint_step. On resume the sim loop re-executes from checkpoint_step,
 83        so we need to remove data from checkpoint_step onward.
 84
 85        For per-step files (1 line per step, no header), the number of lines to
 86        keep is checkpoint_step - 1 (steps 1 through checkpoint_step-1).
 87
 88        For rate-based files (1 header line + 1 data line every RATE steps),
 89        the number of data lines to keep is (checkpoint_step - 1) // RATE,
 90        plus the header line(s).
 91        """
 92        from aegis_sim.parameterization import parametermanager
 93
 94        step = checkpoint_step
 95
 96        # Per-step files: 1 line per step, no header. Keep step-1 lines.
 97        per_step_files = [
 98            "popsize_before_reproduction.csv",
 99            "popsize_after_reproduction.csv",
100            "eggnum_after_reproduction.csv",
101            "resources_before_scavenging.csv",
102            "resources_after_scavenging.csv",
103        ]
104        for fname in per_step_files:
105            self._truncate_file(self.odir / fname, keep_lines=step - 1)
106
107        # Rate-based files with 1 header line.
108        # Data is written when step % RATE == 0 or step == 1.
109        # Number of data lines at step S: 1 (for step 1) + count of multiples of RATE in [2, S-1]
110        # Simpler: lines where skip() returns False for steps 1..S-1
111
112        rate_specs_1header = [
113            ("progress.log", "LOGGING_RATE"),
114        ]
115        for fname, rate_name in rate_specs_1header:
116            rate = getattr(parametermanager.parameters, rate_name)
117            n_data = self._count_recordings(step - 1, rate)
118            self._truncate_file(self.odir / fname, keep_lines=1 + n_data)
119
120        # FlushRecorder spectra: 1 header + data at INTERVAL_RATE
121        spectra_dir = self.odir / "gui" / "spectra"
122        if spectra_dir.exists():
123            rate = parametermanager.parameters.INTERVAL_RATE
124            n_data = self._count_recordings(step - 1, rate)
125            for csv_file in spectra_dir.glob("*.csv"):
126                self._truncate_file(csv_file, keep_lines=1 + n_data)
127
128        # IntervalRecorder: 2 header lines + data at INTERVAL_RATE
129        rate = parametermanager.parameters.INTERVAL_RATE
130        n_data = self._count_recordings(step - 1, rate)
131        for fname in ["gui/genotypes.csv", "gui/phenotypes.csv"]:
132            self._truncate_file(self.odir / fname, keep_lines=2 + n_data)
133
134        # PopgenStatsRecorder: no header, data at POPGENSTATS_RATE
135        popgen_dir = self.odir / "popgen"
136        if popgen_dir.exists():
137            rate = parametermanager.parameters.POPGENSTATS_RATE
138            n_data = self._count_recordings(step - 1, rate)
139            for csv_file in popgen_dir.glob("*.csv"):
140                self._truncate_file(csv_file, keep_lines=n_data)
141
142        # Envdriftmap: no header, writes when step % ENVDRIFT_RATE == 0 (NOT at step 1 unless divisible).
143        # This differs from skip() logic, so we count multiples directly.
144        rate = parametermanager.parameters.ENVDRIFT_RATE
145        if rate > 0:
146            n_data = (step - 1) // rate  # multiples of rate in [1, step-1]
147            self._truncate_file(self.odir / "envdriftmap.csv", keep_lines=n_data)
148
149        # TE files: numbered CSV files in te/ directory.
150        # A new TE file starts at step 1 and every TE_RATE steps.
151        # TE_number increments when a file is flushed (at TE_DURATION offset or final step).
152        # We need to figure out which TE file was in-progress at the checkpoint step
153        # and delete any files started after it.
154        te_rate = parametermanager.parameters.TE_RATE
155        te_duration = parametermanager.parameters.TE_DURATION
156        if te_rate > 0:
157            te_dir = self.odir / "te"
158            if te_dir.exists():
159                self._truncate_te_files(te_dir, step, te_rate, te_duration)
160
161        logging.info(f"Output files truncated to checkpoint step {step}.")
162
163    @staticmethod
164    def _truncate_te_files(te_dir, checkpoint_step, te_rate, te_duration):
165        """Handle TE file truncation on resume.
166
167        TE files are numbered 0.csv, 1.csv, etc. A new collection window opens
168        at step 1 and every TE_RATE steps. The file number increments when the
169        window is flushed. We need to:
170        1. Determine how many complete TE windows finished before checkpoint_step
171        2. Delete any TE files beyond that
172        3. Truncate the in-progress TE file (remove data recorded at/after checkpoint_step)
173        """
174        # Count how many TE windows were fully completed before checkpoint_step.
175        # A window starts at step S where S % te_rate == 0 (or step 1).
176        # It flushes at S + te_duration (or at STEPS_PER_SIMULATION).
177        # Window 0 starts at step 1, flushes at step te_duration (if te_duration < te_rate).
178        # Window i starts at step i*te_rate, flushes at step i*te_rate + te_duration.
179        # A window is "complete" if its flush step < checkpoint_step.
180
181        # Number of complete windows: windows that started AND flushed before checkpoint_step
182        # Window starts: step 1, te_rate, 2*te_rate, 3*te_rate, ...
183        # The window starting at step W flushes at step W + te_duration.
184        # Complete if W + te_duration < checkpoint_step.
185
186        # For simplicity, just count existing TE files and remove those whose
187        # start step >= checkpoint_step.
188        # Window 0 starts at step 1.
189        # Window k starts at step k * te_rate (for k >= 1), or step 1 for k=0.
190
191        existing_files = sorted(te_dir.glob("*.csv"), key=lambda p: int(p.stem))
192        for f in existing_files:
193            file_num = int(f.stem)
194            # Window file_num starts at: step 1 if file_num==0, else file_num * te_rate
195            window_start = 1 if file_num == 0 else file_num * te_rate
196            if window_start >= checkpoint_step:
197                # This window started at or after checkpoint — delete it
198                f.unlink()
199            # If the window started before checkpoint but may contain data from
200            # steps >= checkpoint_step, we need to truncate those lines.
201            # TE files have: 1 header line ("T,E"), then data lines for each
202            # death event recorded during the window. We can't easily map lines
203            # to steps, so we leave partial windows as-is. The resumed sim will
204            # re-open a new window at the appropriate step, and the old partial
205            # data from the interrupted window is acceptable (it's death events
206            # that actually happened).
207
208    @staticmethod
209    def _count_recordings(up_to_step, rate):
210        """Count how many times a recorder with given rate would have written for steps 1..up_to_step.
211
212        Mirrors the skip() logic: always write at step 1, then write at every step divisible by rate.
213        """
214        if rate <= 0 or up_to_step < 1:
215            return 0
216        # Step 1 always records
217        count = 1
218        if up_to_step >= 2:
219            # Multiples of rate in [1, up_to_step] = up_to_step // rate
220            # But step 1 is already counted, so subtract 1 if rate divides 1 (only when rate == 1)
221            count += up_to_step // rate
222            if rate == 1:
223                count -= 1
224        return count
225
226    @staticmethod
227    def _truncate_file(path, keep_lines):
228        """Truncate a file to keep only the first `keep_lines` lines."""
229        if not path.exists():
230            return
231        with open(path, "rb") as f:
232            lines = f.readlines()
233        if len(lines) <= keep_lines:
234            return  # Nothing to truncate
235        with open(path, "wb") as f:
236            f.writelines(lines[:keep_lines])
237
238    def initialize_recorders(self, TICKER_RATE, resuming=False):
239        self.terecorder = TERecorder(odir=self.odir, resuming=resuming)
240        self.picklerecorder = PickleRecorder(odir=self.odir)
241        self.popgenstatsrecorder = PopgenStatsRecorder(odir=self.odir)
242        self.guirecorder = IntervalRecorder(odir=self.odir, resuming=resuming)
243        self.flushrecorder = FlushRecorder(odir=self.odir, resuming=resuming)
244        self.featherrecorder = FeatherRecorder(odir=self.odir)
245        self.phenomaprecorder = PhenomapRecorder(odir=self.odir)
246        self.summaryrecorder = SummaryRecorder(odir=self.odir)
247        self.progressrecorder = ProgressRecorder(odir=self.odir, resuming=resuming)
248        self.simpleprogressrecorder = SimpleProgressRecorder(odir=self.odir)
249        self.ticker = Ticker(odir=self.odir, TICKER_RATE=TICKER_RATE)
250        self.popsizerecorder = PopsizeRecorder(odir=self.odir)
251        self.resourcerecorder = ResourcesRecorder(odir=self.odir)
252        self.configrecorder = ConfigRecorder(odir=self.odir)
253        self.envdriftmaprecorder = Envdriftmaprecorder(odir=self.odir)
254        self.checkpointrecorder = CheckpointRecorder(odir=self.odir)
255
256    #############
257    # UTILITIES #
258    #############
259
260    @staticmethod
261    def make_odir(custom_config_path, overwrite) -> pathlib.Path:
262        output_path = custom_config_path.parent / custom_config_path.stem  # remove .yml
263        is_occupied = output_path.exists() and output_path.is_dir()
264        if is_occupied:
265            if overwrite:
266                shutil.rmtree(output_path)
267            else:
268                raise Exception(f"{output_path} already exists. To overwrite, add flag --overwrite or -o.")
269        return output_path
270
271    @staticmethod
272    def make_subfolders(paths):
273        # TODO relink paths that are now in classes
274        for path in paths:
275            path.mkdir(exist_ok=True, parents=True)
276
277    def is_extinct(self) -> bool:
278        if self.summaryrecorder.extinct:
279            logging.info(f"Population went extinct (at step {variables.steps}).")
280            return True
281        return False
class RecordingManager:
 39class RecordingManager:
 40    """
 41    Container class for various recorders.
 42    Each recorder records a certain type of data.
 43    Most recorders record data as tables, except SummaryRecorder and PickleRecorder which record JSON files and pickles (a binary python format).
 44    Headers and indexes of all tabular files are explicitly recorded.
 45
 46    -----
 47    GUI
 48    AEGIS records a lot of different data.
 49    In brief, AEGIS records
 50    genomic data (population-level allele frequencies and individual-level binary sequences) and
 51    phenotypic data (observed population-level phenotypes and intrinsic individual-level phenotypes),
 52    as well as
 53    derived demographic data (life, death and birth tables),
 54    population genetic data (e.g. effective population size, theta), and
 55    survival analysis data (TE / time-event tables).
 56    Furthermore, it records metadata (e.g. simulation log, processed configuration files) and python pickle files.
 57
 58    Recorded data is distributed in multiple files.
 59    Almost all data are tabular, so each file is a table to which rows are appended as the simulation is running.
 60    The recording rates are frequencies at which rows are added; they are expressed in simulation steps.
 61    """
 62
 63    def init(self, custom_config_path, overwrite):
 64        self.odir = self.make_odir(custom_config_path=custom_config_path, overwrite=overwrite)
 65        self.resuming = False
 66
 67    def init_for_resume(self, custom_config_path):
 68        """Initialize for resume mode — reuse existing output directory, no overwrite."""
 69        output_path = custom_config_path.parent / custom_config_path.stem
 70        if not output_path.exists():
 71            raise FileNotFoundError(
 72                f"Cannot resume: output directory {output_path} does not exist."
 73            )
 74        self.odir = output_path
 75        self.resuming = True
 76
 77    def truncate_for_resume(self, checkpoint_step):
 78        """Truncate output files back to the checkpoint step to avoid duplicate data.
 79
 80        Called after initialize_recorders so self.odir is set. The checkpoint is
 81        saved at the end of run_step for a given step, and all recorders also
 82        write during that same run_step. So the output files already contain data
 83        for checkpoint_step. On resume the sim loop re-executes from checkpoint_step,
 84        so we need to remove data from checkpoint_step onward.
 85
 86        For per-step files (1 line per step, no header), the number of lines to
 87        keep is checkpoint_step - 1 (steps 1 through checkpoint_step-1).
 88
 89        For rate-based files (1 header line + 1 data line every RATE steps),
 90        the number of data lines to keep is (checkpoint_step - 1) // RATE,
 91        plus the header line(s).
 92        """
 93        from aegis_sim.parameterization import parametermanager
 94
 95        step = checkpoint_step
 96
 97        # Per-step files: 1 line per step, no header. Keep step-1 lines.
 98        per_step_files = [
 99            "popsize_before_reproduction.csv",
100            "popsize_after_reproduction.csv",
101            "eggnum_after_reproduction.csv",
102            "resources_before_scavenging.csv",
103            "resources_after_scavenging.csv",
104        ]
105        for fname in per_step_files:
106            self._truncate_file(self.odir / fname, keep_lines=step - 1)
107
108        # Rate-based files with 1 header line.
109        # Data is written when step % RATE == 0 or step == 1.
110        # Number of data lines at step S: 1 (for step 1) + count of multiples of RATE in [2, S-1]
111        # Simpler: lines where skip() returns False for steps 1..S-1
112
113        rate_specs_1header = [
114            ("progress.log", "LOGGING_RATE"),
115        ]
116        for fname, rate_name in rate_specs_1header:
117            rate = getattr(parametermanager.parameters, rate_name)
118            n_data = self._count_recordings(step - 1, rate)
119            self._truncate_file(self.odir / fname, keep_lines=1 + n_data)
120
121        # FlushRecorder spectra: 1 header + data at INTERVAL_RATE
122        spectra_dir = self.odir / "gui" / "spectra"
123        if spectra_dir.exists():
124            rate = parametermanager.parameters.INTERVAL_RATE
125            n_data = self._count_recordings(step - 1, rate)
126            for csv_file in spectra_dir.glob("*.csv"):
127                self._truncate_file(csv_file, keep_lines=1 + n_data)
128
129        # IntervalRecorder: 2 header lines + data at INTERVAL_RATE
130        rate = parametermanager.parameters.INTERVAL_RATE
131        n_data = self._count_recordings(step - 1, rate)
132        for fname in ["gui/genotypes.csv", "gui/phenotypes.csv"]:
133            self._truncate_file(self.odir / fname, keep_lines=2 + n_data)
134
135        # PopgenStatsRecorder: no header, data at POPGENSTATS_RATE
136        popgen_dir = self.odir / "popgen"
137        if popgen_dir.exists():
138            rate = parametermanager.parameters.POPGENSTATS_RATE
139            n_data = self._count_recordings(step - 1, rate)
140            for csv_file in popgen_dir.glob("*.csv"):
141                self._truncate_file(csv_file, keep_lines=n_data)
142
143        # Envdriftmap: no header, writes when step % ENVDRIFT_RATE == 0 (NOT at step 1 unless divisible).
144        # This differs from skip() logic, so we count multiples directly.
145        rate = parametermanager.parameters.ENVDRIFT_RATE
146        if rate > 0:
147            n_data = (step - 1) // rate  # multiples of rate in [1, step-1]
148            self._truncate_file(self.odir / "envdriftmap.csv", keep_lines=n_data)
149
150        # TE files: numbered CSV files in te/ directory.
151        # A new TE file starts at step 1 and every TE_RATE steps.
152        # TE_number increments when a file is flushed (at TE_DURATION offset or final step).
153        # We need to figure out which TE file was in-progress at the checkpoint step
154        # and delete any files started after it.
155        te_rate = parametermanager.parameters.TE_RATE
156        te_duration = parametermanager.parameters.TE_DURATION
157        if te_rate > 0:
158            te_dir = self.odir / "te"
159            if te_dir.exists():
160                self._truncate_te_files(te_dir, step, te_rate, te_duration)
161
162        logging.info(f"Output files truncated to checkpoint step {step}.")
163
164    @staticmethod
165    def _truncate_te_files(te_dir, checkpoint_step, te_rate, te_duration):
166        """Handle TE file truncation on resume.
167
168        TE files are numbered 0.csv, 1.csv, etc. A new collection window opens
169        at step 1 and every TE_RATE steps. The file number increments when the
170        window is flushed. We need to:
171        1. Determine how many complete TE windows finished before checkpoint_step
172        2. Delete any TE files beyond that
173        3. Truncate the in-progress TE file (remove data recorded at/after checkpoint_step)
174        """
175        # Count how many TE windows were fully completed before checkpoint_step.
176        # A window starts at step S where S % te_rate == 0 (or step 1).
177        # It flushes at S + te_duration (or at STEPS_PER_SIMULATION).
178        # Window 0 starts at step 1, flushes at step te_duration (if te_duration < te_rate).
179        # Window i starts at step i*te_rate, flushes at step i*te_rate + te_duration.
180        # A window is "complete" if its flush step < checkpoint_step.
181
182        # Number of complete windows: windows that started AND flushed before checkpoint_step
183        # Window starts: step 1, te_rate, 2*te_rate, 3*te_rate, ...
184        # The window starting at step W flushes at step W + te_duration.
185        # Complete if W + te_duration < checkpoint_step.
186
187        # For simplicity, just count existing TE files and remove those whose
188        # start step >= checkpoint_step.
189        # Window 0 starts at step 1.
190        # Window k starts at step k * te_rate (for k >= 1), or step 1 for k=0.
191
192        existing_files = sorted(te_dir.glob("*.csv"), key=lambda p: int(p.stem))
193        for f in existing_files:
194            file_num = int(f.stem)
195            # Window file_num starts at: step 1 if file_num==0, else file_num * te_rate
196            window_start = 1 if file_num == 0 else file_num * te_rate
197            if window_start >= checkpoint_step:
198                # This window started at or after checkpoint — delete it
199                f.unlink()
200            # If the window started before checkpoint but may contain data from
201            # steps >= checkpoint_step, we need to truncate those lines.
202            # TE files have: 1 header line ("T,E"), then data lines for each
203            # death event recorded during the window. We can't easily map lines
204            # to steps, so we leave partial windows as-is. The resumed sim will
205            # re-open a new window at the appropriate step, and the old partial
206            # data from the interrupted window is acceptable (it's death events
207            # that actually happened).
208
209    @staticmethod
210    def _count_recordings(up_to_step, rate):
211        """Count how many times a recorder with given rate would have written for steps 1..up_to_step.
212
213        Mirrors the skip() logic: always write at step 1, then write at every step divisible by rate.
214        """
215        if rate <= 0 or up_to_step < 1:
216            return 0
217        # Step 1 always records
218        count = 1
219        if up_to_step >= 2:
220            # Multiples of rate in [1, up_to_step] = up_to_step // rate
221            # But step 1 is already counted, so subtract 1 if rate divides 1 (only when rate == 1)
222            count += up_to_step // rate
223            if rate == 1:
224                count -= 1
225        return count
226
227    @staticmethod
228    def _truncate_file(path, keep_lines):
229        """Truncate a file to keep only the first `keep_lines` lines."""
230        if not path.exists():
231            return
232        with open(path, "rb") as f:
233            lines = f.readlines()
234        if len(lines) <= keep_lines:
235            return  # Nothing to truncate
236        with open(path, "wb") as f:
237            f.writelines(lines[:keep_lines])
238
239    def initialize_recorders(self, TICKER_RATE, resuming=False):
240        self.terecorder = TERecorder(odir=self.odir, resuming=resuming)
241        self.picklerecorder = PickleRecorder(odir=self.odir)
242        self.popgenstatsrecorder = PopgenStatsRecorder(odir=self.odir)
243        self.guirecorder = IntervalRecorder(odir=self.odir, resuming=resuming)
244        self.flushrecorder = FlushRecorder(odir=self.odir, resuming=resuming)
245        self.featherrecorder = FeatherRecorder(odir=self.odir)
246        self.phenomaprecorder = PhenomapRecorder(odir=self.odir)
247        self.summaryrecorder = SummaryRecorder(odir=self.odir)
248        self.progressrecorder = ProgressRecorder(odir=self.odir, resuming=resuming)
249        self.simpleprogressrecorder = SimpleProgressRecorder(odir=self.odir)
250        self.ticker = Ticker(odir=self.odir, TICKER_RATE=TICKER_RATE)
251        self.popsizerecorder = PopsizeRecorder(odir=self.odir)
252        self.resourcerecorder = ResourcesRecorder(odir=self.odir)
253        self.configrecorder = ConfigRecorder(odir=self.odir)
254        self.envdriftmaprecorder = Envdriftmaprecorder(odir=self.odir)
255        self.checkpointrecorder = CheckpointRecorder(odir=self.odir)
256
257    #############
258    # UTILITIES #
259    #############
260
261    @staticmethod
262    def make_odir(custom_config_path, overwrite) -> pathlib.Path:
263        output_path = custom_config_path.parent / custom_config_path.stem  # remove .yml
264        is_occupied = output_path.exists() and output_path.is_dir()
265        if is_occupied:
266            if overwrite:
267                shutil.rmtree(output_path)
268            else:
269                raise Exception(f"{output_path} already exists. To overwrite, add flag --overwrite or -o.")
270        return output_path
271
272    @staticmethod
273    def make_subfolders(paths):
274        # TODO relink paths that are now in classes
275        for path in paths:
276            path.mkdir(exist_ok=True, parents=True)
277
278    def is_extinct(self) -> bool:
279        if self.summaryrecorder.extinct:
280            logging.info(f"Population went extinct (at step {variables.steps}).")
281            return True
282        return False

Container class for various recorders. Each recorder records a certain type of data. Most recorders record data as tables, except SummaryRecorder and PickleRecorder which record JSON files and pickles (a binary python format). Headers and indexes of all tabular files are explicitly recorded.


GUI AEGIS records a lot of different data. In brief, AEGIS records genomic data (population-level allele frequencies and individual-level binary sequences) and phenotypic data (observed population-level phenotypes and intrinsic individual-level phenotypes), as well as derived demographic data (life, death and birth tables), population genetic data (e.g. effective population size, theta), and survival analysis data (TE / time-event tables). Furthermore, it records metadata (e.g. simulation log, processed configuration files) and python pickle files.

Recorded data is distributed in multiple files. Almost all data are tabular, so each file is a table to which rows are appended as the simulation is running. The recording rates are frequencies at which rows are added; they are expressed in simulation steps.

def init(self, custom_config_path, overwrite):
63    def init(self, custom_config_path, overwrite):
64        self.odir = self.make_odir(custom_config_path=custom_config_path, overwrite=overwrite)
65        self.resuming = False
def init_for_resume(self, custom_config_path):
67    def init_for_resume(self, custom_config_path):
68        """Initialize for resume mode — reuse existing output directory, no overwrite."""
69        output_path = custom_config_path.parent / custom_config_path.stem
70        if not output_path.exists():
71            raise FileNotFoundError(
72                f"Cannot resume: output directory {output_path} does not exist."
73            )
74        self.odir = output_path
75        self.resuming = True

Initialize for resume mode — reuse existing output directory, no overwrite.

def truncate_for_resume(self, checkpoint_step):
 77    def truncate_for_resume(self, checkpoint_step):
 78        """Truncate output files back to the checkpoint step to avoid duplicate data.
 79
 80        Called after initialize_recorders so self.odir is set. The checkpoint is
 81        saved at the end of run_step for a given step, and all recorders also
 82        write during that same run_step. So the output files already contain data
 83        for checkpoint_step. On resume the sim loop re-executes from checkpoint_step,
 84        so we need to remove data from checkpoint_step onward.
 85
 86        For per-step files (1 line per step, no header), the number of lines to
 87        keep is checkpoint_step - 1 (steps 1 through checkpoint_step-1).
 88
 89        For rate-based files (1 header line + 1 data line every RATE steps),
 90        the number of data lines to keep is (checkpoint_step - 1) // RATE,
 91        plus the header line(s).
 92        """
 93        from aegis_sim.parameterization import parametermanager
 94
 95        step = checkpoint_step
 96
 97        # Per-step files: 1 line per step, no header. Keep step-1 lines.
 98        per_step_files = [
 99            "popsize_before_reproduction.csv",
100            "popsize_after_reproduction.csv",
101            "eggnum_after_reproduction.csv",
102            "resources_before_scavenging.csv",
103            "resources_after_scavenging.csv",
104        ]
105        for fname in per_step_files:
106            self._truncate_file(self.odir / fname, keep_lines=step - 1)
107
108        # Rate-based files with 1 header line.
109        # Data is written when step % RATE == 0 or step == 1.
110        # Number of data lines at step S: 1 (for step 1) + count of multiples of RATE in [2, S-1]
111        # Simpler: lines where skip() returns False for steps 1..S-1
112
113        rate_specs_1header = [
114            ("progress.log", "LOGGING_RATE"),
115        ]
116        for fname, rate_name in rate_specs_1header:
117            rate = getattr(parametermanager.parameters, rate_name)
118            n_data = self._count_recordings(step - 1, rate)
119            self._truncate_file(self.odir / fname, keep_lines=1 + n_data)
120
121        # FlushRecorder spectra: 1 header + data at INTERVAL_RATE
122        spectra_dir = self.odir / "gui" / "spectra"
123        if spectra_dir.exists():
124            rate = parametermanager.parameters.INTERVAL_RATE
125            n_data = self._count_recordings(step - 1, rate)
126            for csv_file in spectra_dir.glob("*.csv"):
127                self._truncate_file(csv_file, keep_lines=1 + n_data)
128
129        # IntervalRecorder: 2 header lines + data at INTERVAL_RATE
130        rate = parametermanager.parameters.INTERVAL_RATE
131        n_data = self._count_recordings(step - 1, rate)
132        for fname in ["gui/genotypes.csv", "gui/phenotypes.csv"]:
133            self._truncate_file(self.odir / fname, keep_lines=2 + n_data)
134
135        # PopgenStatsRecorder: no header, data at POPGENSTATS_RATE
136        popgen_dir = self.odir / "popgen"
137        if popgen_dir.exists():
138            rate = parametermanager.parameters.POPGENSTATS_RATE
139            n_data = self._count_recordings(step - 1, rate)
140            for csv_file in popgen_dir.glob("*.csv"):
141                self._truncate_file(csv_file, keep_lines=n_data)
142
143        # Envdriftmap: no header, writes when step % ENVDRIFT_RATE == 0 (NOT at step 1 unless divisible).
144        # This differs from skip() logic, so we count multiples directly.
145        rate = parametermanager.parameters.ENVDRIFT_RATE
146        if rate > 0:
147            n_data = (step - 1) // rate  # multiples of rate in [1, step-1]
148            self._truncate_file(self.odir / "envdriftmap.csv", keep_lines=n_data)
149
150        # TE files: numbered CSV files in te/ directory.
151        # A new TE file starts at step 1 and every TE_RATE steps.
152        # TE_number increments when a file is flushed (at TE_DURATION offset or final step).
153        # We need to figure out which TE file was in-progress at the checkpoint step
154        # and delete any files started after it.
155        te_rate = parametermanager.parameters.TE_RATE
156        te_duration = parametermanager.parameters.TE_DURATION
157        if te_rate > 0:
158            te_dir = self.odir / "te"
159            if te_dir.exists():
160                self._truncate_te_files(te_dir, step, te_rate, te_duration)
161
162        logging.info(f"Output files truncated to checkpoint step {step}.")

Truncate output files back to the checkpoint step to avoid duplicate data.

Called after initialize_recorders so self.odir is set. The checkpoint is saved at the end of run_step for a given step, and all recorders also write during that same run_step. So the output files already contain data for checkpoint_step. On resume the sim loop re-executes from checkpoint_step, so we need to remove data from checkpoint_step onward.

For per-step files (1 line per step, no header), the number of lines to keep is checkpoint_step - 1 (steps 1 through checkpoint_step-1).

For rate-based files (1 header line + 1 data line every RATE steps), the number of data lines to keep is (checkpoint_step - 1) // RATE, plus the header line(s).

def initialize_recorders(self, TICKER_RATE, resuming=False):
239    def initialize_recorders(self, TICKER_RATE, resuming=False):
240        self.terecorder = TERecorder(odir=self.odir, resuming=resuming)
241        self.picklerecorder = PickleRecorder(odir=self.odir)
242        self.popgenstatsrecorder = PopgenStatsRecorder(odir=self.odir)
243        self.guirecorder = IntervalRecorder(odir=self.odir, resuming=resuming)
244        self.flushrecorder = FlushRecorder(odir=self.odir, resuming=resuming)
245        self.featherrecorder = FeatherRecorder(odir=self.odir)
246        self.phenomaprecorder = PhenomapRecorder(odir=self.odir)
247        self.summaryrecorder = SummaryRecorder(odir=self.odir)
248        self.progressrecorder = ProgressRecorder(odir=self.odir, resuming=resuming)
249        self.simpleprogressrecorder = SimpleProgressRecorder(odir=self.odir)
250        self.ticker = Ticker(odir=self.odir, TICKER_RATE=TICKER_RATE)
251        self.popsizerecorder = PopsizeRecorder(odir=self.odir)
252        self.resourcerecorder = ResourcesRecorder(odir=self.odir)
253        self.configrecorder = ConfigRecorder(odir=self.odir)
254        self.envdriftmaprecorder = Envdriftmaprecorder(odir=self.odir)
255        self.checkpointrecorder = CheckpointRecorder(odir=self.odir)
@staticmethod
def make_odir(custom_config_path, overwrite) -> pathlib.Path:
261    @staticmethod
262    def make_odir(custom_config_path, overwrite) -> pathlib.Path:
263        output_path = custom_config_path.parent / custom_config_path.stem  # remove .yml
264        is_occupied = output_path.exists() and output_path.is_dir()
265        if is_occupied:
266            if overwrite:
267                shutil.rmtree(output_path)
268            else:
269                raise Exception(f"{output_path} already exists. To overwrite, add flag --overwrite or -o.")
270        return output_path
@staticmethod
def make_subfolders(paths):
272    @staticmethod
273    def make_subfolders(paths):
274        # TODO relink paths that are now in classes
275        for path in paths:
276            path.mkdir(exist_ok=True, parents=True)
def is_extinct(self) -> bool:
278    def is_extinct(self) -> bool:
279        if self.summaryrecorder.extinct:
280            logging.info(f"Population went extinct (at step {variables.steps}).")
281            return True
282        return False