aegis_sim.recording.recordingmanager

Data recorder

Records data generated by the simulation.

When thinking about recording additional data, consider that there are three recording methods: I. Snapshots (record data from the population at a specific step) II. Flushes (collect data over time then flush) III. One-time records IV. Other: TE records

  1"""Data recorder
  2
  3Records data generated by the simulation.
  4
  5When thinking about recording additional data, consider that there are three recording methods:
  6    I. Snapshots (record data from the population at a specific step)
  7    II. Flushes (collect data over time then flush)
  8    III. One-time records
  9    IV. Other: TE records
 10"""
 11
 12import pathlib
 13import shutil
 14import logging
 15
 16from aegis_sim import variables
 17
 18from .terecorder import TERecorder
 19from .picklerecorder import PickleRecorder
 20from .popgenstatsrecorder import PopgenStatsRecorder
 21from .intervalrecorder import IntervalRecorder
 22from .flushrecorder import FlushRecorder
 23from .featherrecorder import FeatherRecorder
 24from .phenomaprecorder import PhenomapRecorder
 25from .summaryrecorder import SummaryRecorder
 26from .progressrecorder import ProgressRecorder
 27from .simpleprogressrecorder import SimpleProgressRecorder
 28from .popsizerecorder import PopsizeRecorder
 29from .resourcerecorder import ResourcesRecorder
 30from .ticker import Ticker
 31from .configrecorder import ConfigRecorder
 32from .envdriftmaprecorder import Envdriftmaprecorder
 33from .checkpointrecorder import CheckpointRecorder
 34from .ancestryrecorder import AncestryRecorder
 35
 36# TODO write tests
 37
 38
 39class RecordingManager:
 40    """
 41    Container class for various recorders.
 42    Each recorder records a certain type of data.
 43    Most recorders record data as tables, except SummaryRecorder and PickleRecorder which record JSON files and pickles (a binary python format).
 44    Headers and indexes of all tabular files are explicitly recorded.
 45
 46    -----
 47    GUI
 48    AEGIS records a lot of different data.
 49    In brief, AEGIS records
 50    genomic data (population-level allele frequencies and individual-level binary sequences) and
 51    phenotypic data (observed population-level phenotypes and intrinsic individual-level phenotypes),
 52    as well as
 53    derived demographic data (life, death and birth tables),
 54    population genetic data (e.g. effective population size, theta), and
 55    survival analysis data (TE / time-event tables).
 56    Furthermore, it records metadata (e.g. simulation log, processed configuration files) and python pickle files.
 57
 58    Recorded data is distributed in multiple files.
 59    Almost all data are tabular, so each file is a table to which rows are appended as the simulation is running.
 60    The recording rates are frequencies at which rows are added; they are expressed in simulation steps.
 61    """
 62
 63    def init(self, custom_config_path, overwrite):
 64        self.odir = self.make_odir(custom_config_path=custom_config_path, overwrite=overwrite)
 65        self.resuming = False
 66
 67    def init_for_resume(self, custom_config_path):
 68        """Initialize for resume mode — reuse existing output directory, no overwrite."""
 69        output_path = custom_config_path.parent / custom_config_path.stem
 70        if not output_path.exists():
 71            raise FileNotFoundError(
 72                f"Cannot resume: output directory {output_path} does not exist."
 73            )
 74        self.odir = output_path
 75        self.resuming = True
 76
 77    def truncate_for_resume(self, checkpoint_step):
 78        """Truncate output files back to the checkpoint step to avoid duplicate data.
 79
 80        Called after initialize_recorders so self.odir is set. The checkpoint is
 81        saved at the end of run_step for a given step, and all recorders also
 82        write during that same run_step. So the output files already contain data
 83        for checkpoint_step. On resume the sim loop re-executes from checkpoint_step,
 84        so we need to remove data from checkpoint_step onward.
 85
 86        For per-step files (1 line per step, no header), the number of lines to
 87        keep is checkpoint_step - 1 (steps 1 through checkpoint_step-1).
 88
 89        For rate-based files (1 header line + 1 data line every RATE steps),
 90        the number of data lines to keep is (checkpoint_step - 1) // RATE,
 91        plus the header line(s).
 92        """
 93        from aegis_sim.parameterization import parametermanager
 94
 95        step = checkpoint_step
 96
 97        # Per-step files: 1 line per step, no header. Keep step-1 lines.
 98        per_step_files = [
 99            "popsize_before_reproduction.csv",
100            "popsize_after_reproduction.csv",
101            "eggnum_after_reproduction.csv",
102            "resources_before_scavenging.csv",
103            "resources_after_scavenging.csv",
104        ]
105        for fname in per_step_files:
106            self._truncate_file(self.odir / fname, keep_lines=step - 1)
107
108        # Rate-based files with 1 header line.
109        # Data is written when step % RATE == 0 or step == 1.
110        # Number of data lines at step S: 1 (for step 1) + count of multiples of RATE in [2, S-1]
111        # Simpler: lines where skip() returns False for steps 1..S-1
112
113        rate_specs_1header = [
114            ("progress.log", "LOGGING_RATE"),
115        ]
116        for fname, rate_name in rate_specs_1header:
117            rate = getattr(parametermanager.parameters, rate_name)
118            n_data = self._count_recordings(step - 1, rate)
119            self._truncate_file(self.odir / fname, keep_lines=1 + n_data)
120
121        # FlushRecorder spectra: 1 header + data at INTERVAL_RATE
122        spectra_dir = self.odir / "gui" / "spectra"
123        if spectra_dir.exists():
124            rate = parametermanager.parameters.INTERVAL_RATE
125            n_data = self._count_recordings(step - 1, rate)
126            for csv_file in spectra_dir.glob("*.csv"):
127                self._truncate_file(csv_file, keep_lines=1 + n_data)
128
129        # IntervalRecorder: 2 header lines + data at INTERVAL_RATE
130        rate = parametermanager.parameters.INTERVAL_RATE
131        n_data = self._count_recordings(step - 1, rate)
132        for fname in ["gui/genotypes.csv", "gui/phenotypes.csv"]:
133            self._truncate_file(self.odir / fname, keep_lines=2 + n_data)
134
135        # PopgenStatsRecorder: no header, data at POPGENSTATS_RATE
136        popgen_dir = self.odir / "popgen"
137        if popgen_dir.exists():
138            rate = parametermanager.parameters.POPGENSTATS_RATE
139            n_data = self._count_recordings(step - 1, rate)
140            for csv_file in popgen_dir.glob("*.csv"):
141                self._truncate_file(csv_file, keep_lines=n_data)
142
143        # Envdriftmap: no header, writes when step % ENVDRIFT_RATE == 0 (NOT at step 1 unless divisible).
144        # This differs from skip() logic, so we count multiples directly.
145        rate = parametermanager.parameters.ENVDRIFT_RATE
146        if rate > 0:
147            n_data = (step - 1) // rate  # multiples of rate in [1, step-1]
148            self._truncate_file(self.odir / "envdriftmap.csv", keep_lines=n_data)
149
150        # TE files: numbered CSV files in te/ directory.
151        # A new TE file starts at step 1 and every TE_RATE steps.
152        # TE_number increments when a file is flushed (at TE_DURATION offset or final step).
153        # We need to figure out which TE file was in-progress at the checkpoint step
154        # and delete any files started after it.
155        te_rate = parametermanager.parameters.TE_RATE
156        te_duration = parametermanager.parameters.TE_DURATION
157        if te_rate > 0:
158            te_dir = self.odir / "te"
159            if te_dir.exists():
160                self._truncate_te_files(te_dir, step, te_rate, te_duration)
161
162        logging.info(f"Output files truncated to checkpoint step {step}.")
163
164    @staticmethod
165    def _truncate_te_files(te_dir, checkpoint_step, te_rate, te_duration):
166        """Handle TE file truncation on resume.
167
168        TE files are numbered 0.csv, 1.csv, etc. A new collection window opens
169        at step 1 and every TE_RATE steps. The file number increments when the
170        window is flushed. We need to:
171        1. Determine how many complete TE windows finished before checkpoint_step
172        2. Delete any TE files beyond that
173        3. Truncate the in-progress TE file (remove data recorded at/after checkpoint_step)
174        """
175        # Count how many TE windows were fully completed before checkpoint_step.
176        # A window starts at step S where S % te_rate == 0 (or step 1).
177        # It flushes at S + te_duration (or at STEPS_PER_SIMULATION).
178        # Window 0 starts at step 1, flushes at step te_duration (if te_duration < te_rate).
179        # Window i starts at step i*te_rate, flushes at step i*te_rate + te_duration.
180        # A window is "complete" if its flush step < checkpoint_step.
181
182        # Number of complete windows: windows that started AND flushed before checkpoint_step
183        # Window starts: step 1, te_rate, 2*te_rate, 3*te_rate, ...
184        # The window starting at step W flushes at step W + te_duration.
185        # Complete if W + te_duration < checkpoint_step.
186
187        # For simplicity, just count existing TE files and remove those whose
188        # start step >= checkpoint_step.
189        # Window 0 starts at step 1.
190        # Window k starts at step k * te_rate (for k >= 1), or step 1 for k=0.
191
192        existing_files = sorted(te_dir.glob("*.csv"), key=lambda p: int(p.stem))
193        for f in existing_files:
194            file_num = int(f.stem)
195            # Window file_num starts at: step 1 if file_num==0, else file_num * te_rate
196            window_start = 1 if file_num == 0 else file_num * te_rate
197            if window_start >= checkpoint_step:
198                # This window started at or after checkpoint — delete it
199                f.unlink()
200            # If the window started before checkpoint but may contain data from
201            # steps >= checkpoint_step, we need to truncate those lines.
202            # TE files have: 1 header line ("T,E"), then data lines for each
203            # death event recorded during the window. We can't easily map lines
204            # to steps, so we leave partial windows as-is. The resumed sim will
205            # re-open a new window at the appropriate step, and the old partial
206            # data from the interrupted window is acceptable (it's death events
207            # that actually happened).
208
209    @staticmethod
210    def _count_recordings(up_to_step, rate):
211        """Count how many times a recorder with given rate would have written for steps 1..up_to_step.
212
213        Mirrors the skip() logic: always write at step 1, then write at every step divisible by rate.
214        """
215        if rate <= 0 or up_to_step < 1:
216            return 0
217        # Step 1 always records
218        count = 1
219        if up_to_step >= 2:
220            # Multiples of rate in [1, up_to_step] = up_to_step // rate
221            # But step 1 is already counted, so subtract 1 if rate divides 1 (only when rate == 1)
222            count += up_to_step // rate
223            if rate == 1:
224                count -= 1
225        return count
226
227    @staticmethod
228    def _truncate_file(path, keep_lines):
229        """Truncate a file to keep only the first `keep_lines` lines."""
230        if not path.exists():
231            return
232        with open(path, "rb") as f:
233            lines = f.readlines()
234        if len(lines) <= keep_lines:
235            return  # Nothing to truncate
236        with open(path, "wb") as f:
237            f.writelines(lines[:keep_lines])
238
239    def initialize_recorders(self, TICKER_RATE, resuming=False):
240        self.terecorder = TERecorder(odir=self.odir, resuming=resuming)
241        self.picklerecorder = PickleRecorder(odir=self.odir)
242        self.popgenstatsrecorder = PopgenStatsRecorder(odir=self.odir)
243        self.guirecorder = IntervalRecorder(odir=self.odir, resuming=resuming)
244        self.flushrecorder = FlushRecorder(odir=self.odir, resuming=resuming)
245        self.featherrecorder = FeatherRecorder(odir=self.odir)
246        self.phenomaprecorder = PhenomapRecorder(odir=self.odir)
247        self.summaryrecorder = SummaryRecorder(odir=self.odir)
248        self.progressrecorder = ProgressRecorder(odir=self.odir, resuming=resuming)
249        self.simpleprogressrecorder = SimpleProgressRecorder(odir=self.odir)
250        self.ticker = Ticker(odir=self.odir, TICKER_RATE=TICKER_RATE)
251        self.popsizerecorder = PopsizeRecorder(odir=self.odir)
252        self.resourcerecorder = ResourcesRecorder(odir=self.odir)
253        self.configrecorder = ConfigRecorder(odir=self.odir)
254        self.envdriftmaprecorder = Envdriftmaprecorder(odir=self.odir)
255        self.checkpointrecorder = CheckpointRecorder(odir=self.odir)
256        self.ancestryrecorder = AncestryRecorder(odir=self.odir)
257
258    #############
259    # UTILITIES #
260    #############
261
262    @staticmethod
263    def make_odir(custom_config_path, overwrite) -> pathlib.Path:
264        output_path = custom_config_path.parent / custom_config_path.stem  # remove .yml
265        is_occupied = output_path.exists() and output_path.is_dir()
266        if is_occupied:
267            if overwrite:
268                shutil.rmtree(output_path)
269            else:
270                raise Exception(f"{output_path} already exists. To overwrite, add flag --overwrite or -o.")
271        return output_path
272
273    @staticmethod
274    def make_subfolders(paths):
275        # TODO relink paths that are now in classes
276        for path in paths:
277            path.mkdir(exist_ok=True, parents=True)
278
279    def is_extinct(self) -> bool:
280        if self.summaryrecorder.extinct:
281            logging.info(f"Population went extinct (at step {variables.steps}).")
282            return True
283        return False
class RecordingManager:
 40class RecordingManager:
 41    """
 42    Container class for various recorders.
 43    Each recorder records a certain type of data.
 44    Most recorders record data as tables, except SummaryRecorder and PickleRecorder which record JSON files and pickles (a binary python format).
 45    Headers and indexes of all tabular files are explicitly recorded.
 46
 47    -----
 48    GUI
 49    AEGIS records a lot of different data.
 50    In brief, AEGIS records
 51    genomic data (population-level allele frequencies and individual-level binary sequences) and
 52    phenotypic data (observed population-level phenotypes and intrinsic individual-level phenotypes),
 53    as well as
 54    derived demographic data (life, death and birth tables),
 55    population genetic data (e.g. effective population size, theta), and
 56    survival analysis data (TE / time-event tables).
 57    Furthermore, it records metadata (e.g. simulation log, processed configuration files) and python pickle files.
 58
 59    Recorded data is distributed in multiple files.
 60    Almost all data are tabular, so each file is a table to which rows are appended as the simulation is running.
 61    The recording rates are frequencies at which rows are added; they are expressed in simulation steps.
 62    """
 63
 64    def init(self, custom_config_path, overwrite):
 65        self.odir = self.make_odir(custom_config_path=custom_config_path, overwrite=overwrite)
 66        self.resuming = False
 67
 68    def init_for_resume(self, custom_config_path):
 69        """Initialize for resume mode — reuse existing output directory, no overwrite."""
 70        output_path = custom_config_path.parent / custom_config_path.stem
 71        if not output_path.exists():
 72            raise FileNotFoundError(
 73                f"Cannot resume: output directory {output_path} does not exist."
 74            )
 75        self.odir = output_path
 76        self.resuming = True
 77
 78    def truncate_for_resume(self, checkpoint_step):
 79        """Truncate output files back to the checkpoint step to avoid duplicate data.
 80
 81        Called after initialize_recorders so self.odir is set. The checkpoint is
 82        saved at the end of run_step for a given step, and all recorders also
 83        write during that same run_step. So the output files already contain data
 84        for checkpoint_step. On resume the sim loop re-executes from checkpoint_step,
 85        so we need to remove data from checkpoint_step onward.
 86
 87        For per-step files (1 line per step, no header), the number of lines to
 88        keep is checkpoint_step - 1 (steps 1 through checkpoint_step-1).
 89
 90        For rate-based files (1 header line + 1 data line every RATE steps),
 91        the number of data lines to keep is (checkpoint_step - 1) // RATE,
 92        plus the header line(s).
 93        """
 94        from aegis_sim.parameterization import parametermanager
 95
 96        step = checkpoint_step
 97
 98        # Per-step files: 1 line per step, no header. Keep step-1 lines.
 99        per_step_files = [
100            "popsize_before_reproduction.csv",
101            "popsize_after_reproduction.csv",
102            "eggnum_after_reproduction.csv",
103            "resources_before_scavenging.csv",
104            "resources_after_scavenging.csv",
105        ]
106        for fname in per_step_files:
107            self._truncate_file(self.odir / fname, keep_lines=step - 1)
108
109        # Rate-based files with 1 header line.
110        # Data is written when step % RATE == 0 or step == 1.
111        # Number of data lines at step S: 1 (for step 1) + count of multiples of RATE in [2, S-1]
112        # Simpler: lines where skip() returns False for steps 1..S-1
113
114        rate_specs_1header = [
115            ("progress.log", "LOGGING_RATE"),
116        ]
117        for fname, rate_name in rate_specs_1header:
118            rate = getattr(parametermanager.parameters, rate_name)
119            n_data = self._count_recordings(step - 1, rate)
120            self._truncate_file(self.odir / fname, keep_lines=1 + n_data)
121
122        # FlushRecorder spectra: 1 header + data at INTERVAL_RATE
123        spectra_dir = self.odir / "gui" / "spectra"
124        if spectra_dir.exists():
125            rate = parametermanager.parameters.INTERVAL_RATE
126            n_data = self._count_recordings(step - 1, rate)
127            for csv_file in spectra_dir.glob("*.csv"):
128                self._truncate_file(csv_file, keep_lines=1 + n_data)
129
130        # IntervalRecorder: 2 header lines + data at INTERVAL_RATE
131        rate = parametermanager.parameters.INTERVAL_RATE
132        n_data = self._count_recordings(step - 1, rate)
133        for fname in ["gui/genotypes.csv", "gui/phenotypes.csv"]:
134            self._truncate_file(self.odir / fname, keep_lines=2 + n_data)
135
136        # PopgenStatsRecorder: no header, data at POPGENSTATS_RATE
137        popgen_dir = self.odir / "popgen"
138        if popgen_dir.exists():
139            rate = parametermanager.parameters.POPGENSTATS_RATE
140            n_data = self._count_recordings(step - 1, rate)
141            for csv_file in popgen_dir.glob("*.csv"):
142                self._truncate_file(csv_file, keep_lines=n_data)
143
144        # Envdriftmap: no header, writes when step % ENVDRIFT_RATE == 0 (NOT at step 1 unless divisible).
145        # This differs from skip() logic, so we count multiples directly.
146        rate = parametermanager.parameters.ENVDRIFT_RATE
147        if rate > 0:
148            n_data = (step - 1) // rate  # multiples of rate in [1, step-1]
149            self._truncate_file(self.odir / "envdriftmap.csv", keep_lines=n_data)
150
151        # TE files: numbered CSV files in te/ directory.
152        # A new TE file starts at step 1 and every TE_RATE steps.
153        # TE_number increments when a file is flushed (at TE_DURATION offset or final step).
154        # We need to figure out which TE file was in-progress at the checkpoint step
155        # and delete any files started after it.
156        te_rate = parametermanager.parameters.TE_RATE
157        te_duration = parametermanager.parameters.TE_DURATION
158        if te_rate > 0:
159            te_dir = self.odir / "te"
160            if te_dir.exists():
161                self._truncate_te_files(te_dir, step, te_rate, te_duration)
162
163        logging.info(f"Output files truncated to checkpoint step {step}.")
164
165    @staticmethod
166    def _truncate_te_files(te_dir, checkpoint_step, te_rate, te_duration):
167        """Handle TE file truncation on resume.
168
169        TE files are numbered 0.csv, 1.csv, etc. A new collection window opens
170        at step 1 and every TE_RATE steps. The file number increments when the
171        window is flushed. We need to:
172        1. Determine how many complete TE windows finished before checkpoint_step
173        2. Delete any TE files beyond that
174        3. Truncate the in-progress TE file (remove data recorded at/after checkpoint_step)
175        """
176        # Count how many TE windows were fully completed before checkpoint_step.
177        # A window starts at step S where S % te_rate == 0 (or step 1).
178        # It flushes at S + te_duration (or at STEPS_PER_SIMULATION).
179        # Window 0 starts at step 1, flushes at step te_duration (if te_duration < te_rate).
180        # Window i starts at step i*te_rate, flushes at step i*te_rate + te_duration.
181        # A window is "complete" if its flush step < checkpoint_step.
182
183        # Number of complete windows: windows that started AND flushed before checkpoint_step
184        # Window starts: step 1, te_rate, 2*te_rate, 3*te_rate, ...
185        # The window starting at step W flushes at step W + te_duration.
186        # Complete if W + te_duration < checkpoint_step.
187
188        # For simplicity, just count existing TE files and remove those whose
189        # start step >= checkpoint_step.
190        # Window 0 starts at step 1.
191        # Window k starts at step k * te_rate (for k >= 1), or step 1 for k=0.
192
193        existing_files = sorted(te_dir.glob("*.csv"), key=lambda p: int(p.stem))
194        for f in existing_files:
195            file_num = int(f.stem)
196            # Window file_num starts at: step 1 if file_num==0, else file_num * te_rate
197            window_start = 1 if file_num == 0 else file_num * te_rate
198            if window_start >= checkpoint_step:
199                # This window started at or after checkpoint — delete it
200                f.unlink()
201            # If the window started before checkpoint but may contain data from
202            # steps >= checkpoint_step, we need to truncate those lines.
203            # TE files have: 1 header line ("T,E"), then data lines for each
204            # death event recorded during the window. We can't easily map lines
205            # to steps, so we leave partial windows as-is. The resumed sim will
206            # re-open a new window at the appropriate step, and the old partial
207            # data from the interrupted window is acceptable (it's death events
208            # that actually happened).
209
210    @staticmethod
211    def _count_recordings(up_to_step, rate):
212        """Count how many times a recorder with given rate would have written for steps 1..up_to_step.
213
214        Mirrors the skip() logic: always write at step 1, then write at every step divisible by rate.
215        """
216        if rate <= 0 or up_to_step < 1:
217            return 0
218        # Step 1 always records
219        count = 1
220        if up_to_step >= 2:
221            # Multiples of rate in [1, up_to_step] = up_to_step // rate
222            # But step 1 is already counted, so subtract 1 if rate divides 1 (only when rate == 1)
223            count += up_to_step // rate
224            if rate == 1:
225                count -= 1
226        return count
227
228    @staticmethod
229    def _truncate_file(path, keep_lines):
230        """Truncate a file to keep only the first `keep_lines` lines."""
231        if not path.exists():
232            return
233        with open(path, "rb") as f:
234            lines = f.readlines()
235        if len(lines) <= keep_lines:
236            return  # Nothing to truncate
237        with open(path, "wb") as f:
238            f.writelines(lines[:keep_lines])
239
240    def initialize_recorders(self, TICKER_RATE, resuming=False):
241        self.terecorder = TERecorder(odir=self.odir, resuming=resuming)
242        self.picklerecorder = PickleRecorder(odir=self.odir)
243        self.popgenstatsrecorder = PopgenStatsRecorder(odir=self.odir)
244        self.guirecorder = IntervalRecorder(odir=self.odir, resuming=resuming)
245        self.flushrecorder = FlushRecorder(odir=self.odir, resuming=resuming)
246        self.featherrecorder = FeatherRecorder(odir=self.odir)
247        self.phenomaprecorder = PhenomapRecorder(odir=self.odir)
248        self.summaryrecorder = SummaryRecorder(odir=self.odir)
249        self.progressrecorder = ProgressRecorder(odir=self.odir, resuming=resuming)
250        self.simpleprogressrecorder = SimpleProgressRecorder(odir=self.odir)
251        self.ticker = Ticker(odir=self.odir, TICKER_RATE=TICKER_RATE)
252        self.popsizerecorder = PopsizeRecorder(odir=self.odir)
253        self.resourcerecorder = ResourcesRecorder(odir=self.odir)
254        self.configrecorder = ConfigRecorder(odir=self.odir)
255        self.envdriftmaprecorder = Envdriftmaprecorder(odir=self.odir)
256        self.checkpointrecorder = CheckpointRecorder(odir=self.odir)
257        self.ancestryrecorder = AncestryRecorder(odir=self.odir)
258
259    #############
260    # UTILITIES #
261    #############
262
263    @staticmethod
264    def make_odir(custom_config_path, overwrite) -> pathlib.Path:
265        output_path = custom_config_path.parent / custom_config_path.stem  # remove .yml
266        is_occupied = output_path.exists() and output_path.is_dir()
267        if is_occupied:
268            if overwrite:
269                shutil.rmtree(output_path)
270            else:
271                raise Exception(f"{output_path} already exists. To overwrite, add flag --overwrite or -o.")
272        return output_path
273
274    @staticmethod
275    def make_subfolders(paths):
276        # TODO relink paths that are now in classes
277        for path in paths:
278            path.mkdir(exist_ok=True, parents=True)
279
280    def is_extinct(self) -> bool:
281        if self.summaryrecorder.extinct:
282            logging.info(f"Population went extinct (at step {variables.steps}).")
283            return True
284        return False

Container class for various recorders. Each recorder records a certain type of data. Most recorders record data as tables, except SummaryRecorder and PickleRecorder which record JSON files and pickles (a binary python format). Headers and indexes of all tabular files are explicitly recorded.


GUI AEGIS records a lot of different data. In brief, AEGIS records genomic data (population-level allele frequencies and individual-level binary sequences) and phenotypic data (observed population-level phenotypes and intrinsic individual-level phenotypes), as well as derived demographic data (life, death and birth tables), population genetic data (e.g. effective population size, theta), and survival analysis data (TE / time-event tables). Furthermore, it records metadata (e.g. simulation log, processed configuration files) and python pickle files.

Recorded data is distributed in multiple files. Almost all data are tabular, so each file is a table to which rows are appended as the simulation is running. The recording rates are frequencies at which rows are added; they are expressed in simulation steps.

def init(self, custom_config_path, overwrite):
64    def init(self, custom_config_path, overwrite):
65        self.odir = self.make_odir(custom_config_path=custom_config_path, overwrite=overwrite)
66        self.resuming = False
def init_for_resume(self, custom_config_path):
68    def init_for_resume(self, custom_config_path):
69        """Initialize for resume mode — reuse existing output directory, no overwrite."""
70        output_path = custom_config_path.parent / custom_config_path.stem
71        if not output_path.exists():
72            raise FileNotFoundError(
73                f"Cannot resume: output directory {output_path} does not exist."
74            )
75        self.odir = output_path
76        self.resuming = True

Initialize for resume mode — reuse existing output directory, no overwrite.

def truncate_for_resume(self, checkpoint_step):
 78    def truncate_for_resume(self, checkpoint_step):
 79        """Truncate output files back to the checkpoint step to avoid duplicate data.
 80
 81        Called after initialize_recorders so self.odir is set. The checkpoint is
 82        saved at the end of run_step for a given step, and all recorders also
 83        write during that same run_step. So the output files already contain data
 84        for checkpoint_step. On resume the sim loop re-executes from checkpoint_step,
 85        so we need to remove data from checkpoint_step onward.
 86
 87        For per-step files (1 line per step, no header), the number of lines to
 88        keep is checkpoint_step - 1 (steps 1 through checkpoint_step-1).
 89
 90        For rate-based files (1 header line + 1 data line every RATE steps),
 91        the number of data lines to keep is (checkpoint_step - 1) // RATE,
 92        plus the header line(s).
 93        """
 94        from aegis_sim.parameterization import parametermanager
 95
 96        step = checkpoint_step
 97
 98        # Per-step files: 1 line per step, no header. Keep step-1 lines.
 99        per_step_files = [
100            "popsize_before_reproduction.csv",
101            "popsize_after_reproduction.csv",
102            "eggnum_after_reproduction.csv",
103            "resources_before_scavenging.csv",
104            "resources_after_scavenging.csv",
105        ]
106        for fname in per_step_files:
107            self._truncate_file(self.odir / fname, keep_lines=step - 1)
108
109        # Rate-based files with 1 header line.
110        # Data is written when step % RATE == 0 or step == 1.
111        # Number of data lines at step S: 1 (for step 1) + count of multiples of RATE in [2, S-1]
112        # Simpler: lines where skip() returns False for steps 1..S-1
113
114        rate_specs_1header = [
115            ("progress.log", "LOGGING_RATE"),
116        ]
117        for fname, rate_name in rate_specs_1header:
118            rate = getattr(parametermanager.parameters, rate_name)
119            n_data = self._count_recordings(step - 1, rate)
120            self._truncate_file(self.odir / fname, keep_lines=1 + n_data)
121
122        # FlushRecorder spectra: 1 header + data at INTERVAL_RATE
123        spectra_dir = self.odir / "gui" / "spectra"
124        if spectra_dir.exists():
125            rate = parametermanager.parameters.INTERVAL_RATE
126            n_data = self._count_recordings(step - 1, rate)
127            for csv_file in spectra_dir.glob("*.csv"):
128                self._truncate_file(csv_file, keep_lines=1 + n_data)
129
130        # IntervalRecorder: 2 header lines + data at INTERVAL_RATE
131        rate = parametermanager.parameters.INTERVAL_RATE
132        n_data = self._count_recordings(step - 1, rate)
133        for fname in ["gui/genotypes.csv", "gui/phenotypes.csv"]:
134            self._truncate_file(self.odir / fname, keep_lines=2 + n_data)
135
136        # PopgenStatsRecorder: no header, data at POPGENSTATS_RATE
137        popgen_dir = self.odir / "popgen"
138        if popgen_dir.exists():
139            rate = parametermanager.parameters.POPGENSTATS_RATE
140            n_data = self._count_recordings(step - 1, rate)
141            for csv_file in popgen_dir.glob("*.csv"):
142                self._truncate_file(csv_file, keep_lines=n_data)
143
144        # Envdriftmap: no header, writes when step % ENVDRIFT_RATE == 0 (NOT at step 1 unless divisible).
145        # This differs from skip() logic, so we count multiples directly.
146        rate = parametermanager.parameters.ENVDRIFT_RATE
147        if rate > 0:
148            n_data = (step - 1) // rate  # multiples of rate in [1, step-1]
149            self._truncate_file(self.odir / "envdriftmap.csv", keep_lines=n_data)
150
151        # TE files: numbered CSV files in te/ directory.
152        # A new TE file starts at step 1 and every TE_RATE steps.
153        # TE_number increments when a file is flushed (at TE_DURATION offset or final step).
154        # We need to figure out which TE file was in-progress at the checkpoint step
155        # and delete any files started after it.
156        te_rate = parametermanager.parameters.TE_RATE
157        te_duration = parametermanager.parameters.TE_DURATION
158        if te_rate > 0:
159            te_dir = self.odir / "te"
160            if te_dir.exists():
161                self._truncate_te_files(te_dir, step, te_rate, te_duration)
162
163        logging.info(f"Output files truncated to checkpoint step {step}.")

Truncate output files back to the checkpoint step to avoid duplicate data.

Called after initialize_recorders so self.odir is set. The checkpoint is saved at the end of run_step for a given step, and all recorders also write during that same run_step. So the output files already contain data for checkpoint_step. On resume the sim loop re-executes from checkpoint_step, so we need to remove data from checkpoint_step onward.

For per-step files (1 line per step, no header), the number of lines to keep is checkpoint_step - 1 (steps 1 through checkpoint_step-1).

For rate-based files (1 header line + 1 data line every RATE steps), the number of data lines to keep is (checkpoint_step - 1) // RATE, plus the header line(s).

def initialize_recorders(self, TICKER_RATE, resuming=False):
240    def initialize_recorders(self, TICKER_RATE, resuming=False):
241        self.terecorder = TERecorder(odir=self.odir, resuming=resuming)
242        self.picklerecorder = PickleRecorder(odir=self.odir)
243        self.popgenstatsrecorder = PopgenStatsRecorder(odir=self.odir)
244        self.guirecorder = IntervalRecorder(odir=self.odir, resuming=resuming)
245        self.flushrecorder = FlushRecorder(odir=self.odir, resuming=resuming)
246        self.featherrecorder = FeatherRecorder(odir=self.odir)
247        self.phenomaprecorder = PhenomapRecorder(odir=self.odir)
248        self.summaryrecorder = SummaryRecorder(odir=self.odir)
249        self.progressrecorder = ProgressRecorder(odir=self.odir, resuming=resuming)
250        self.simpleprogressrecorder = SimpleProgressRecorder(odir=self.odir)
251        self.ticker = Ticker(odir=self.odir, TICKER_RATE=TICKER_RATE)
252        self.popsizerecorder = PopsizeRecorder(odir=self.odir)
253        self.resourcerecorder = ResourcesRecorder(odir=self.odir)
254        self.configrecorder = ConfigRecorder(odir=self.odir)
255        self.envdriftmaprecorder = Envdriftmaprecorder(odir=self.odir)
256        self.checkpointrecorder = CheckpointRecorder(odir=self.odir)
257        self.ancestryrecorder = AncestryRecorder(odir=self.odir)
@staticmethod
def make_odir(custom_config_path, overwrite) -> pathlib.Path:
263    @staticmethod
264    def make_odir(custom_config_path, overwrite) -> pathlib.Path:
265        output_path = custom_config_path.parent / custom_config_path.stem  # remove .yml
266        is_occupied = output_path.exists() and output_path.is_dir()
267        if is_occupied:
268            if overwrite:
269                shutil.rmtree(output_path)
270            else:
271                raise Exception(f"{output_path} already exists. To overwrite, add flag --overwrite or -o.")
272        return output_path
@staticmethod
def make_subfolders(paths):
274    @staticmethod
275    def make_subfolders(paths):
276        # TODO relink paths that are now in classes
277        for path in paths:
278            path.mkdir(exist_ok=True, parents=True)
def is_extinct(self) -> bool:
280    def is_extinct(self) -> bool:
281        if self.summaryrecorder.extinct:
282            logging.info(f"Population went extinct (at step {variables.steps}).")
283            return True
284        return False